O processamento de dados em tempo real, como o próprio nome diz, é a prática de lidar com o fluxo de dados capturados em tempo real e processados com latência mínima para gerar relatórios instantâneos ou, até mesmo, para produzir respostas automatizadas à um determinado evento.
Hoje, vamos desenvolver uma aplicação bem simples para a ingestão e o processamento de dados em tempo real com o Spark Structured Streaming e o Apache Kafka 🎲. Como este tutorial tem como objetivo ser uma breve introdução à essas tecnologias, vamos desenvolver um simples contador de palavras. Nada muito elaborado ou complexo 😥.
Caso queira ver um exemplo mais completo, confira um outro artigo que escrevi: uma análise de dados em tempo real com base em dados de tráfego aéreo.
Análise de dados de tráfego aéreo em tempo real com Spark Structured Streaming e Apache Kafka
Geazi Anc ・ Oct 28
E aí, se interessou? Então continue lendo!
Você também pode conferir este projeto em meu GitHub 😉.
O que é Spark Structured Streaming e Apache Kafka?
O Spark Structured Streaming é um módulo do PySpark que facilita a criação de aplicativos e pipelines de streaming com as mesmas e familiares APIs do Spark. O Spark Structured Streaming abstrai conceitos complexos de streaming, como processamento incremental, pontos de verificação e marcas d'água, para que você possa criar aplicativos e pipelines de streaming sem aprender novos conceitos ou ferramentas ❇.
Já o O Apache Kafka é uma plataforma de streaming de eventos distribuídos de código aberto usada por milhares de empresas para pipelines de dados de alto desempenho, análise de streaming, integração de dados e aplicativos de missão crítica.
Os eventos, que podem ser dados capturados em tempo real, são enviados à um tópico do Kafka. Fazendo uma analogia: um tópico é como se fosse uma pasta de arquivos em seu computador, e os eventos são os arquivos desta pasta.
Um produtor, ou producer, é responsável por enviar eventos de streaming à um ou mais tópicos do Kafka. Já um consumidor, ou consumer, é responsável por se inscrever em um ou mais tópicos do Kafka e ler ou processar tais eventos enviados pelo produtor.
Sugiro a leitura da documentação que introduz esses conceitos com mais detalhes 😉.
Agora que já entendemos um pouco sobre os conceitos abordados nesse tutorial, podemos começar a desenvolver nossa aplicação 👏🏼.
Arquitetura
Calma lá! Antes de começarmos, vamos conhecer um pouco sobre a arquitetura de nosso projeto. Ela é composta pelos seguintes componentes lógicos, assim como as tecnologias que serão utilizadas:
- Armazenamento de dados analíticos: é onde nossos dados ficarão armazenados. Para isso, iremos criar um tópico no Apache Kafka para que, posteriormente, possamos consumir esses dados que estarão sendo enviados em tempo real pelo produtor.
- Ingestão de dados: um produtor desenvolvido em Python que irá enviar palavras aleatórias em tempo real ao tópico criado no Kafka.
- Consumo de dados: um consumidor desenvolvido em Python que tem como objetivo apenas monitorar as palavras que estão chegando em tempo real ao tópico do Kafka.
- Processamento de fluxo e análise: uma aplicação desenvolvida com o PySpark que irá consumir em tempo real as palavras enviadas pelo produtor ao tópico do Kafka. É esta aplicação que irá agregar as palavras e gerar um relatório da contagem de tais palavras.
Criando o ambiente de desenvolvimento
Este tutorial assume que você já tenha o PySpark instalado em sua máquina. Caso ainda não tenha, confira as etapas na própria documentação.
Já para o Apache Kafka, vamos utilizar ele por meio de conteinerização via Docker 🎉🐳.
E, por fim, vamos utilizar o Python através de um ambiente virtual.
Apache Kafka por conteinerização via Docker
Sem mais delongas, crie uma pasta chamada data-streaming-sample e adicione o arquivo docker-compose.yml dentro dela.
$ mkdir data-streaming-sample
$ cd data-streaming-sample
$ touch docker-compose.yml
Agora, adicione o seguinte conteúdo dentro do arquivo docker-compose:
version: '3.9'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Feito! Já podemos subir nosso servidor do Kafka. Para isso, digite o seguinte comando no terminal:
$ docker compose up -d
$ docker compose ps
NAME COMMAND SERVICE STATUS PORTS
data-streaming-sample-kafka-1 "/etc/confluent/dock…" kafka running 9092/tcp, 0.0.0.0:29092->29092/tcp
data-streaming-sample-zookeeper-1 "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Observação: este tutorial está utilizando a versão 2.0 do Docker Compose. É por este motivo que não há o "-" entre docker e compose ☺.
Agora, precisamos criar um tópico dentro do Kafka que irá armazenar as palavras enviadas em tempo real pelo produtor. Para isso, vamos acessar o Kafka dentro do contêiner:
$ docker compose exec kafka bash
E enfim criar o tópico, chamado de words.
$ kafka-topics --create --topic words --bootstrap-server localhost:29092
Created topic words.
Criação do ambiente virtual
Para desenvolvermos nosso produtor, ou seja, a aplicação que será responsável por enviar as palavras em tempo real para o tópico do Kafka, precisamos fazer o uso da biblioteca kafka-python. O kafka-python é uma biblioteca desenvolvida pela comunidade que nos permite desenvolver produtores e consumidores que se integram com o Apache Kafka.
Primeiro, vamos criar um arquivo chamado requirements.txt e adicionar a seguinte dependência dentro dele:
kafka-python
Segundo, vamos criar um ambiente virtual e instalar as dependências no arquivo requirements.txt:
$ python -m venv venv
$ venv\scripts\activate
$ pip install -r requirements.txt
Feito! Agora sim nosso ambiente já está pronto para o desenvolvimento 🚀.
Desenvolvimento do produtor
Vamos criar nosso produtor. Como foi dito, um produtor é responsável por enviar os dados em tempo real para um tópico no Kafka. Este produtor irá enviar aleatoriamente uma dentre quatro palavras ao tópico words que criamos anteriormente, em um intervalo de tempo aleatório entre um e cinco segundos.
Para isso, criamos uma instância da classe KafkaProducer. Esta classe recebe dois parâmetros:
- bootstrap_servers: o servidor onde está rodando o Kafka. Neste caso, ele está rodando no localhost, na porta 29092, conforme configuramos no arquivo docker-compose.
- value_serializer: umma função que serializa os dados em bits para serem enviados para o Kafka. Neste caso, a função recebe uma string e retorna um objeto do tipo Bytes.
Depois, utilizamos o método send para enviar os dados ao tópico. Ele recebe dois parâmetros: o tópico que os dados serão enviados e os dados propriamente dito.
Vamos criar um diretório chamado src e um subdiretório chamado kafka. Dentro do diretório kafka, vamos criar um arquivo chamado producer.py e adicionar o seguinte código dentro dele:
import random
from time import sleep
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="localhost:29092",
value_serializer=lambda x: x.encode("utf-8")
)
while True:
words = [
"spark",
"kafka",
"streaming",
"python"
]
word = random.choice(words)
future = producer.send("words", value=word)
print(future.get(timeout=60))
sleep(random.randint(1, 6))
E já podemos executar nosso produtor. Você pode interromper a execução a qualquer momento pressionando CTRL + C.
$ python producer.py
RecordMetadata(topic='words', partition=0, topic_partition=TopicPartition(topic='words', partition=0), offset=0, timestamp=1664469827519, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=6, serialized_header_size=-1)
RecordMetadata(topic='words', partition=0, topic_partition=TopicPartition(topic='words', partition=0), offset=1, timestamp=1664469833559, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=6, serialized_header_size=-1)
RecordMetadata(topic='words', partition=0, topic_partition=TopicPartition(topic='words', partition=0), offset=2, timestamp=1664469838567, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=9, serialized_header_size=-1)
RecordMetadata(topic='words', partition=0, topic_partition=TopicPartition(topic='words', partition=0), offset=3, timestamp=1664469842582, log_start_offset=0, checksum=None, serialized_key_size=-1, serialized_value_size=6, serialized_header_size=-1)
...
Desenvolvimento do consumidor
Vamos criar nosso consumidor. Como foi dito antes, e conforme explicado na arquitetura, um consumidor é responsável por se inscrever em um tópico e ler os eventos que são enviados até ele em tempo real. Nosso consumidor simplesmente irá monitorar as palavras que chegam até o tópico words.
Para isso, vamos criar uma instância da classe KafkaConsumer. Esta classe recebe três parâmetros:
- O tópico que queremos que o consumidor se inscreva. Neste caso, o tópico words.
- bootstrap_servers: o servidor onde está rodando o Kafka. Neste caso, ele está rodando no localhost, na porta 29092, conforme configuramos no arquivo docker-compose.
- value_deserializer: umma função que deserializa os dados de bits para string.
Ainda no diretório kafka, vamos criar um arquivo chamado consumer.py e adicionar o seguinte código dentro dele:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"words",
bootstrap_servers="localhost:29092",
value_deserializer=lambda x: x.decode("utf-8")
)
for msg in consumer:
print(msg.value)
Vamos executar nosso consumidor. Certifique-se que o produtor ainda esteja rodando, hein!
$ python consumer.py
kafka
streaming
kafka
streaming
spark
...
E finalizamos o desenvolvimento de nosso produtor e consumidor. Bem simples, não? ☺
Desenvolvimento do processamento de dados em tempo real com Spark Structured Streaming
Esta é uma etapa bem simples, também. Vamos criar uma aplicação com o PySpark que irá se inscrever no tópico words e processar em tempo real os dados que chegam até ele.
Primeiro, com base na instância da classe SparkSession, vamos utilizar o método .writeStream. Depois disso, chamamos uma série de métodos encadeados, entre eles format e options. O format iremos dizer a ele que vamos ler dados em tempo real do Kafka. Já os métodos options informamos o servidor onde está rodando o servidor Kafka, o tópico que ele irá consumir e o modo que ele deve consumir, ou seja, do mais antigo para o mais recente.
Mãos à obra! Vamos criar um novo arquivo no diretório src chamado word_counts.py e adicionar o seguinte código dentro dele:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = (SparkSession.builder
.appName("Words Count Analysis")
.getOrCreate()
)
df1 = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:29092")
.option("subscribe", "words")
.option("startingOffsets", "earliest")
.load()
)
df2 = df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
lines = df2.select(F.explode(F.split(df2.value, " ")).alias("word"))
word_counts = (lines.groupBy("word")
.count()
.orderBy("count", ascending=False)
)
(word_counts.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
)
E já podemos executar nosso arquivo através do spark-submit. Mas calma lá! Quando estamos integrando o PySpark com o Kafka, devemos executar o spark-submit de modo diferente. É necessário que informemos o pacote do Apache Kafka e a versão atual do Apache Spark através do parâmetro --packages.
Caso seja a primeira vez que esteja integrando o Apache Spark com o Apache Kafka, talvez a execução do spark-submit demore um pouco. Isso ocorre porque ele precisa fazer o download dos pacotes necessários.
Vamos lá! Sei que seus dedos estão coçando para executar o projeto e ver tudo isso em ação. Portanto, digite no terminal, dentro do diretório src:
`spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 word_counts.py
E veja o resultado!
`
+---------+-----+
| word|count|
+---------+-----+
| kafka| 11|
| python| 10|
| spark| 7|
|streaming| 6|
+---------+-----+
`
Observe que o dataframe será constantemente atualizado conforme o producer envia novos dados ao tópico. Deixe o produtor rodando por um tempo e veja por você mesmo 😉.
Considerações finais
E acabamos por aqui, pessoal. Neste tutorial ensinei como processar dados em tempo real com Spark Structured Streaming e Apache Kafka.
Para isso, desenvolvemos um simples contador de palavras, que agrega as palavras consumidas de um tópico do Kafka e exibe a contagem dessas palavras de forma decrescente. Também desenvolvemos um produtor, que envia dados em tempo real constantemente à um tópico do Kafka e um consumidor, que apenas monitora o tópico na medida que novos dados são enviados pelo produtor.
Espero que tenham gostado. Até a próxima 💚!
Top comments (0)