Atualmente, vivemos em um mundo onde peta bytes de dados são gerados a cada segundo. Como tal, a análise e o processamento desses dados em tempo real torna-se mais do que essencial para uma empresa que busca gerar insights de negócios com mais precisão conforme dados e mais dados são produzidos.
Hoje, vamos desenvolver uma análise de dados em tempo real com base em dados fictícios de um tráfego aéreo utilizando Spark Structured Streaming e Apache Kafka. Caso não saiba o que são essas tecnologias, sugiro a leitura de meu artigo que escrevi introduzindo elas com mais detalhes, assim como outros conceitos que serão abordados no decorrer desse artigo. Então, não esquece de conferir lá 💚.
Uma breve Introdução ao processamento de dados em tempo real com Spark Structured Streaming e Apache Kafka
Geazi Anc ・ Sep 29 '22
Você pode conferir o projeto completo em meu GitHub.
Arquitetura
Pois bem, imagine que você, pessoa engenheira de dados, trabalhe em uma empresa aérea chamada de SkyX, onde a cada segundo dados sobre o tráfego aéreo são gerados.
Você foi solicitada para desenvolver uma dashboard que exibe em tempo real dados desses voos, como um rank das cidades mais visitadas no exterior; as cidades onde mais saem pessoas; e as aeronaves que mais transportam pessoas ao redor do mundo.
Esses são os dados que são gerados a cada voo:
- aircraft_name: nome da aeronave. Na SkyX, só existem apenas cinco aeronaves disponíveis.
- From: cidade de onde a aeronave está partindo. A SkyX só realiza voos entre cinco cidades ao redor do mundo.
- To: cidade de destino da aeronave. Como foi dito, a SkyX só realiza voos entre cinco cidades ao redor do mundo.
- Passengers: quantidade de passageiros que a aeronave está transportando. Todas as aeronaves da SkyX transportam entre 50 e 100 pessoas a cada voo.
A seguir está a arquitetura básica de nosso projeto:
- Produtor: responsável por produzir dados do tráfego aéreo das aeronaves e enviá-los à um tópico do Apache Kafka.
- Consumidor: apenas observa os dados que chegam em tempo real ao tópico do Apache Kafka.
- Análise de dados: três dashboards que processam e analisam em tempo real os dados que chegam no tópico do Apache Kafka. Análise das cidades que mais recebem turistas; análise das cidades que mais saem pessoas para visitar outras cidades; e análise das aeronaves da SkyX que mais transportam pessoas entre as cidades ao redor do mundo.
Preparando o ambiente de desenvolvimento
Este tutorial assume que você já tenha o PySpark instalado em sua máquina. Caso ainda não tenha, confira as etapas na própria documentação.
Já para o Apache Kafka, vamos utilizar ele por meio de conteinerização via Docker 🎉🐳.
E, por fim, vamos utilizar o Python através de um ambiente virtual.
Apache Kafka por conteinerização via Docker
Sem mais delongas, crie uma pasta chamada skyx e adicione o arquivo docker-compose.yml dentro dela.
$ mkdir skyx
$ cd skyx
$ touch docker-compose.yml
Agora, adicione o seguinte conteúdo dentro do arquivo docker-compose:
version: '3.9'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Feito! Já podemos subir nosso servidor do Kafka. Para isso, digite o seguinte comando no terminal:
$ docker compose up -d
$ docker compose ps
NAME COMMAND SERVICE STATUS PORTS
skyx-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp
skyx-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Observação: este tutorial está utilizando a versão 2.0 do Docker Compose. É por este motivo que não há o "-" entre docker e compose ☺.
Agora, precisamos criar um tópico dentro do Kafka que irá armazenar os dados enviados em tempo real pelo produtor. Para isso, vamos acessar o Kafka dentro do contêiner:
$ docker compose exec kafka bash
E enfim criar o tópico, chamado de airtraffic.
$ kafka-topics --create --topic airtraffic --bootstrap-server localhost:29092
Created topic airtraffic.
Criação do ambiente virtual
Para desenvolvermos nosso produtor, ou seja, a aplicação que será responsável por enviar os dados do tráfego aéreo em tempo real para o tópico do Kafka, precisamos fazer o uso da biblioteca kafka-python. O kafka-python é uma biblioteca desenvolvida pela comunidade que nos permite desenvolver produtores e consumidores que se integram com o Apache Kafka.
Primeiro, vamos criar um arquivo chamado requirements.txt e adicionar a seguinte dependência dentro dele:
kafka-python
Segundo, vamos criar um ambiente virtual e instalar as dependências no arquivo requirements.txt:
$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt
Feito! Agora sim nosso ambiente já está pronto para o desenvolvimento 🚀.
Desenvolvimento do produtor
Agora vamos criar nosso produtor. Como foi dito, o produtor será responsável por enviar os dados do tráfego aéreo para o tópico recém criado do Kafka.
Como também foi dito na arquitetura, a SkyX realiza voos apenas entre cinco cidades ao redor do mundo, e tem apenas cinco aeronaves disponíveis 😹. Vale ressaltar que cada aeronave transporta entre 50 e 100 pessoas.
Observe que os dados são gerados de forma aleatória e enviados ao tópico no formato json em um intervalo de tempo entre 1 e 6 segundos 😉.
Vamos lá! Crie um subdiretório chamado src e outro subdiretório chamado kafka. Dentro do diretório kafka, crie um arquivo chamado airtraffic_producer.py e adicione o seguinte código dentro dele:
import random
from json import dumps
from time import sleep
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="localhost:29092",
value_serializer=lambda x: dumps(x).encode("utf-8")
)
while True:
cities = [
"São Paulo, Brazil",
"Tokyo, Japan",
"Berlin, Germany",
"Rome, Italy",
"Seoul, South Korea"
]
aircraft_names = [
"Convair B-36 Peacemaker",
"Lockheed C-5 Galaxy",
"Northrop B-2 Spirit",
"Boeing B-52 Stratofortress",
"McDonnell XF-85 Goblin"
]
aircraft = {
"aircraft_name": random.choice(aircraft_names),
"from": random.choice(cities),
"to": random.choice(cities),
"passengers": random.randint(50, 101)
}
future = producer.send("airtraffic", value=aircraft)
print(future.get(timeout=60))
sleep(random.randint(1, 6))
Feito! Desenvolvemos nosso produtor. Execute-o e deixe rodando por um tempo.
$ python airtraffic_producer.py
Desenvolvimento do consumidor
Agora vamos desenvolver nosso consumidor. Essa será uma aplicação bem simples. Ela irá apenas exibir no terminal em tempo real os dados que chegam no tópico do kafka.
Ainda dentro do diretório kafka, crie um arquivo chamado airtraffic_consumer.py e adicione o seguinte código dentro dele:
from json import loads
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"airtraffic",
bootstrap_servers="localhost:29092",
value_deserializer=lambda x: loads(x.decode("utf-8"))
)
for msg in consumer:
print(msg.value)
Viu só, eu te disse que era bem simples. Execute-o e observe os dados que serão exibidos em tempo real conforme o produtor envia os dados ao tópico.
$ python airtraffic_consumer.py
Análise de dados: cidades que mais recebem turistas
Agora começamos com nossa análise de dados. Nesse momento, vamos desenvolver uma dashboard, uma aplicação, que irá exibir em tempo real um rank das cidades que mais recebem turistas. Ou seja, iremos agrupar os dados pela coluna to e fazer uma somatória com base na coluna passengers. Bem simples!
Para isso, dentro do diretório src, crie um subdiretório chamado dashboards e crie um arquivo chamado tourists_analysis.py. Em seguida, adicione o seguinte código dentro dele:
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("Tourists Analysis")
.getOrCreate()
)
df1 = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:29092")
.option("subscribe", "airtraffic")
.option("startingOffsets", "earliest")
.load()
)
df2 = df1.selectExpr("CAST(value AS STRING)")
aircraft = {
"aircraft_name": "",
"from": "",
"to": "",
"passengers": 0
}
schema = F.schema_of_json(F.lit(json.dumps(aircraft)))
airtraffic = (df2.select(F.from_json(df2.value, schema).alias("jsondata"))
.select("jsondata.*")
)
tourists = (airtraffic.groupBy("to")
.agg({"passengers": "sum"})
.withColumnRenamed("sum(passengers)", "tourists")
.withColumnRenamed("to", "city")
.orderBy("tourists", ascending=False)
)
(tourists.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
)
E já podemos executar nosso arquivo através do spark-submit. Mas calma lá! Quando estamos integrando o PySpark com o Kafka, devemos executar o spark-submit de modo diferente. É necessário que informemos o pacote do Apache Kafka e a versão atual do Apache Spark através do parâmetro --packages.
Caso seja a primeira vez que esteja integrando o Apache Spark com o Apache Kafka, talvez a execução do spark-submit demore um pouco. Isso ocorre porque ele precisa fazer o download dos pacotes necessários.
Certifique-se que o produtor ainda esteja rodando para que possamos ver a análise dos dados em tempo real. Dentro do diretório dashboards, execute o seguinte comando:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 tourists_analysis.py
+------------------+--------+
| city|tourists|
+------------------+--------+
| Rome, Italy| 2628|
| Tokyo, Japan| 2467|
| Berlin, Germany| 2204|
|Seoul, South Korea| 1823|
| São Paulo, Brazil| 1719|
+------------------+--------+
Análise de dados: cidades onde mais saem pessoas
Essa análise é bem semelhante a anterior. Porém, ao invés de analisarmos em tempo real as cidades que mais recebem turistas, vamos analisar as cidades onde mais saem pessoas. Para isso, crie um arquivo chamado leavers_analysis.py e adicione o seguinte código dentro dele:
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("Leavers Analysis")
.getOrCreate()
)
df1 = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:29092")
.option("subscribe", "airtraffic")
.option("startingOffsets", "earliest")
.load()
)
df2 = df1.selectExpr("CAST(value AS STRING)")
aircraft = {
"aircraft_name": "",
"from": "",
"to": "",
"passengers": 0
}
schema = F.schema_of_json(F.lit(json.dumps(aircraft)))
airtraffic = (df2.select(F.from_json(df2.value, schema).alias("jsondata"))
.select("jsondata.*")
)
leavers = (airtraffic.groupBy("from")
.agg({"passengers": "sum"})
.withColumnRenamed("sum(passengers)", "leavers")
.withColumnRenamed("from", "city")
.orderBy("leavers", ascending=False)
)
(leavers.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
)
Certifique-se que o produtor ainda esteja rodando para que possamos ver a análise dos dados em tempo real. Dentro do diretório dashboards, execute o seguinte comando:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 leavers_analysis.py
+------------------+-------+
| city|leavers|
+------------------+-------+
| Tokyo, Japan| 2673|
| Berlin, Germany| 2305|
| São Paulo, Brazil| 2096|
|Seoul, South Korea| 1895|
| Rome, Italy| 1872|
+------------------+-------+
Análise de dados: aeronaves que mais transportam passageiros
Essa análise é bem mais simples do que as anteriores. Vamos analisar em tempo real as aeronaves que mais transportam passageiros entre as cidades ao redor do mundo. Crie um arquivo chamado aircrafts_analysis.py e adicione o seguinte código dentro dele:
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("Aircrafts Analysis")
.getOrCreate()
)
df1 = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:29092")
.option("subscribe", "airtraffic")
.option("startingOffsets", "earliest")
.load()
)
df2 = df1.selectExpr("CAST(value AS STRING)")
aircraft = {
"aircraft_name": "",
"from": "",
"to": "",
"passengers": 0
}
schema = F.schema_of_json(F.lit(json.dumps(aircraft)))
airtraffic = (df2.select(F.from_json(df2.value, schema).alias("jsondata"))
.select("jsondata.*")
)
aircrafts = (airtraffic.groupBy("aircraft_name")
.agg({"passengers": "sum"})
.withColumnRenamed("sum(passengers)", "total_passengers")
.orderBy("total_passengers", ascending=False)
)
(aircrafts.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
)
Certifique-se que o produtor ainda esteja rodando para que possamos ver a análise dos dados em tempo real. Dentro do diretório dashboards, execute o seguinte comando:
$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 aircrafts_analysis.py
+--------------------+----------------+
| aircraft_name|total_passengers|
+--------------------+----------------+
|McDonnell XF-85 G...| 2533|
|Boeing B-52 Strat...| 2345|
|Convair B-36 Peac...| 2012|
| Lockheed C-5 Galaxy| 2002|
| Northrop B-2 Spirit| 1949|
+--------------------+----------------+
Considerações finais
E finalizamos por aqui, pessoal! Neste artigo desenvolvemos uma análise de dados em tempo real com base em dados fictícios de um tráfego aéreo utilizando o Spark Structured Streaming e o Apache Kafka.
Para isso, desenvolvemos um produtor que envia esses dados em tempo real ao tópico do kafka, e depois desenvolvemos 3 dashboards para analisar esses dados em tempo real.
Espero que tenham gostado. Até a próxima 💚.
Top comments (0)