DEV Community

Mayara Machado
Mayara Machado

Posted on • Updated on

Pytest Mocks, o que são?

Esse texto faz é o primeiro de uma série de textos sobre testes em aplicações de processamento de dados que estarei trazendo aqui e em meu blog pessoal.

Quando fiz minha transição de carreira de engenheira de software para engenheira de dados, passei a ter conversas com pessoas da área de dados que não possuíam uma formação em engenharia de software. Nessas conversas, um questionamento surgia repetidamente: como escrever testes?

Escrever testes pode, de fato, parecer uma tarefa complexa para quem não está acostumado, já que exige uma mudança na forma de escrever código. A verdade é que não há mistério algum, mas sim uma questão de prática e repetição. Meu objetivo principal neste artigo é guiar você, que está começando, em um processo que mostra como podemos criar testes para aplicações que processam dados, garantindo qualidade e confiabilidade no código.

Esse texto faz parte de uma série que estarei trazendo ao longo das próximas semanas onde compartilho como escrever testes automatizados em código voltado para engenharia de dados. No artigo de hoje quero explorar um pouco sobre mocks. Em diversos cenários de código, uma pipeline de dados estará realizando conexões, chamadas de API, integração com serviços da Cloud e etc, o que pode causar uma certa confusão em como podemos testar essa aplicação. Vamos explorar hoje algumas bibliotecas interessantes para escrita de testes focando no uso dos mocks.

Afinal, o que são Mocks?

Mocks são objetos simulados usados em testes para imitar o comportamento de dependências externas ou componentes que não são o foco do teste. Eles permitem que você isole a unidade de código que está sendo testada, garantindo que o teste seja mais controlável e preditivo. A utilização de mocks é uma prática comum em testes unitários e testes de integração.

E devemos usar os mocks quando:

  • A dependência é irrelevante para o teste;
  • A dependência não está disponível;
  • Queremos testar comportamentos especiais, simulações de erros ou respostas específicas.

Em pipelines de dados, o Mocking permite criar representações de componentes externos – como um banco de dados, um serviço de mensageria ou uma API – sem depender de suas infraestruturas reais. Isso é particularmente útil em ambientes de processamento de dados, que integram várias tecnologias, como PySpark para processamento distribuído, Kafka para mensageria, além de serviços em nuvem, como AWS e GCP.

Nesses cenários onde temos pipelines de dados, o Mocking facilita a execução de testes isolados e rápidos, minimizando custos e tempo de execução. Ele permite que cada parte do pipeline seja verificada com precisão, sem falhas intermitentes causadas por conexões reais ou infraestrutura externa, e com a segurança de que cada integração funciona como esperado.

Podemos encontrar em cada linguagem de programação, módulo internos que já disponibilizam funções de Mock para serem implementados. Em Python, a biblioteca nativa unittest.mock é a principal ferramenta para criar mocks, permitindo simular objetos e funções com facilidade e controle. Em Go, o processo de Mocking é comumente suportado por pacotes externos, como o mockery, pois a linguagem não possui uma biblioteca nativa de Mock; o mockery é especialmente útil para gerar mocks a partir de interfaces, um recurso nativo de Go. Já em Java, o Mockito se destaca como uma biblioteca popular e poderosa para criar mocks, integrando-se ao JUnit para facilitar testes unitários robustos. Essas bibliotecas oferecem uma base essencial para testar componentes isolados, especialmente em pipelines de dados e sistemas distribuídos onde a simulação de fontes de dados e APIs externas é crítica.

Implementando Mocks

Vamos começar com um exemplo básico de como podemos usar Mocks. Supondo que temos uma função que realiza chamadas de API e precisamos realizar a escrita de testes unitários para essa função:

def get_data_from_api(url):
    import requests
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return None
Enter fullscreen mode Exit fullscreen mode

Para abordar corretamente os cenários de teste, precisamos primeiro entender quais situações devem ser cobertas. Como nossa função realiza chamadas REST, os testes devem contemplar, no mínimo, dois cenários principais: um em que a requisição é bem-sucedida e outro em que a resposta não é a esperada. Poderíamos executar o código com uma URL real para observar o comportamento, mas essa abordagem apresenta desvantagens, pois não teríamos controle sobre os diferentes tipos de resposta, além de deixar o teste vulnerável a mudanças na resposta da URL ou a sua eventual indisponibilidade. Para evitar essas inconsistências, utilizaremos Mocks.

from unittest import mock

@mock.patch('requests.get')
    def test_get_data_from_api_success(mock_get):
        # Configura o mock para retornar uma resposta simulada
        mock_get.return_value.status_code = 200
        mock_get.return_value.json.return_value = {"key": "value"}

        # Chama a função com o mock ativo
        result = get_data_from_api("http://fakeurl.com")

        # Verifica se o mock foi chamado corretamente e o resultado é o esperado
        mock_get.assert_called_once_with("http://fakeurl.com")
        self.assertEqual(result, {"key": "value"})
Enter fullscreen mode Exit fullscreen mode

Com a decoração @mock.patch da biblioteca unittest do Python, podemos substituir a chamada requests.get por um mock, um "objeto falso" que simula o comportamento da função get dentro do contexto de teste, eliminando a dependência externa.

Ao definir valores para o return_value do mock, podemos especificar exatamente o que esperamos que o objeto retorne ao ser chamado na função que estamos testando. É importante que a estrutura do return_value siga a mesma dos objetos reais que estamos substituindo. Por exemplo, um objeto response do módulo requests possui atributos como status_code e métodos como json(). Assim, para simularmos uma resposta da função requests.get, podemos atribuir o valor esperado a esses atributos e métodos diretamente no mock.

response.status_code = mock_get.return_value.status_code
response.json() = mock_get.return_value.json.return_value
Enter fullscreen mode Exit fullscreen mode

Neste caso específico, o foco é simular a resposta da requisição, ou seja, testar o comportamento da função com diferentes resultados esperados sem depender de uma URL externa e sem impacto em nosso ambiente de testes.

from unittest import TestCase, mock

# Função que será testada
def get_data_from_api(url):
    import requests
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return None

# Teste usando Mock
class TestGetDataFromApi(TestCase):
    @mock.patch('requests.get')
    def test_get_data_from_api_success(self, mock_get):
        # Configura o mock para retornar uma resposta simulada
        mock_get.return_value.status_code = 200
        mock_get.return_value.json.return_value = {"key": "value"}

        # Chama a função com o mock ativo
        result = get_data_from_api("http://url.com")

        # Verifica se o mock foi chamado corretamente e o resultado é o esperado
        mock_get.assert_called_once_with("http://url.com")
        self.assertEqual(result, {"key": "value"})

    @mock.patch('requests.get')
    def test_get_data_from_api_failure(self, mock_get):
        # Configura o mock para simular uma resposta com erro
        mock_get.return_value.status_code = 404

        # Chama a função com o mock ativo
        result = get_data_from_api("http://url.com")

        # Verifica o comportamento em caso de erro
        mock_get.assert_called_once_with("http://url.com")
        self.assertIsNone(result)

# Execução dos testes (se for usado em script)
if __name__ == '__main__':
    import unittest
    unittest.main()

Enter fullscreen mode Exit fullscreen mode

Ao simular respostas de erro da API em testes, podemos ir além do básico e verificar o comportamento da aplicação em relação a diferentes tipos de códigos de status HTTP, como 404, 401, 500 e 503. Isso proporciona uma cobertura mais ampla e assegura que a aplicação lida adequadamente com cada tipo de falha, entendo de que forma essas variações na chamada podem impactar nossa aplicação/processamento de dados. Em chamadas de método POST, podemos adicionar uma camada extra de validação, verificando não apenas o status_code e o funcionamento básico da chamada, mas também o schema do envio e da resposta recebida, garantindo que os dados retornados sigam o formato esperado. Essa abordagem de teste mais detalhada ajuda a prevenir problemas futuros, assegurando que a aplicação esteja preparada para lidar com uma variedade de cenários de erro e que os dados recebidos estejam sempre de acordo com o que foi projetado.

Implementando Mocks com PySpark

Agora que vimos um caso simples de utilização de Mocks em um código puramente python, vamos expandir nossos casos para um trecho de código que usa Pyspark.

Para testar funções PySpark, especialmente operações em DataFrame como filter, groupBy, e join, o uso de mocks é uma abordagem eficaz que elimina a necessidade de executar o Spark real, reduzindo o tempo de teste e simplificando o ambiente de desenvolvimento. A biblioteca unittest.mock do Python permite simular comportamentos desses métodos, possibilitando verificar o fluxo e a lógica de código sem dependência da infraestrutura Spark.

Vejamos, dado a seguinte função onde temos uma transformação que realiza operação de filtro, groupBy e join em dataframes em Spark.

def transform_data(df, df_other):
    filtered_df = df.filter(df['value'] > 10)
    grouped_df = filtered_df.groupBy('category').count()
    result_df = grouped_df.join(df_other, on="category", how="inner")
    return result_df
Enter fullscreen mode Exit fullscreen mode

Para executar um teste PySpark precisamos que a configuração do Spark seja feita localmente. Essa configuração é feita no método setUpClass, que cria uma instância do Spark que será utilizada em todos os testes da classe. Isso nos permite executar PySpark de forma isolada, possibilitando a realização de operações de transformação reais sem depender de um cluster completo. Após a conclusão dos testes, o método tearDownClass é responsável por finalizar a sessão do Spark, garantindo que todos os recursos sejam liberados adequadamente e que o ambiente de teste seja limpo.

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
import unittest
from unittest import mock

class TestTransformData(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.master("local[1]").appName("PySparkTest").getOrCreate()

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

    def test_transform_data(self):
        # Cria dados de entrada
        df_data = [
            ("A", 15),
            ("B", 5),
            ("A", 20),
            ("C", 25)
        ]
        df_other_data = [("A", "description A"), ("C", "description C")]

        # Cria DataFrames a partir dos dados simulados
        df = self.spark.createDataFrame(df_data, ["category", "value"])
        df_other = self.spark.createDataFrame(df_other_data, ["category", "description"])

        # Executa a função com processamento real
        result_df = transform_data(df, df_other)

        # Realiza verificações nas operações do DataFrame resultante
        result_data = result_df.collect()  # Pega todos os dados do DataFrame em uma lista de linhas
        expected_data = [("A", 2, "description A"), ("C", 1, "description C")]

        # Converte os resultados em uma lista de tuplas para fácil comparação
        result_tuples = [(row.category, row['count'], row.description) for row in result_data]
        self.assertEqual(result_tuples, expected_data)
Enter fullscreen mode Exit fullscreen mode

No teste test_transform_data, começamos criando DataFrames de exemplo para df e df_other, os quais contêm os dados que serão utilizados nas transformações. Em seguida, executamos a função transform_data sem aplicar mocks, permitindo que as operações de filter, groupBy e join ocorram de fato e resultem em um novo DataFrame. Após a execução, utilizamos o método collect() para extrair os dados do DataFrame resultante, o que nos permite comparar esses dados com os valores esperados e, assim, validar a transformação realizada de maneira real e precisa.

Mas também podemos ter cenários onde queremos testar o resultado de uma dessas funções do pyspark. Que seja necessário mockar uma outra parte do código que possa estar representando um gargalo no tempo de execução e que não represente um risco para o nosso processo. Sendo assim, podemos nos apoiar da técnica de mockar uma função/módulo, conforme vimos no exemplo anterior utilizando requests.

from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
import unittest
from unittest import mock

class TestTransformData(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.spark = SparkSession.builder.master("local[1]").appName("PySparkTest").getOrCreate()

    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()

    @mock.patch('pyspark.sql.DataFrame.filter')
    def test_transform_data_with_mocked_filter(self, mock_filter):
        # Mock para a operação filter, mantendo o processamento real nas outras etapas
        mock_filtered_df = self.spark.createDataFrame([("A", 15), ("A", 20), ("C", 25)], ["category", "value"])
        mock_filter.return_value = mock_filtered_df

        df_data = [("A", 15), ("B", 5), ("A", 20), ("C", 25)]
        df_other_data = [("A", "description A"), ("C", "description C")]

        df = self.spark.createDataFrame(df_data, ["category", "value"])
        df_other = self.spark.createDataFrame(df_other_data, ["category", "description"])

        # Executa a função, agora com filter mockado
        result_df = transform_data(df, df_other)
        result_data = result_df.collect()

        # Compara com o dado esperado
        expected_data = [("A", 2, "description A"), ("C", 1, "description C")]  # O resultado deve ser ajustado conforme a lógica do teste
        result_tuples = [(row.category, row['count'], row.description) for row in result_data]
        self.assertEqual(result_tuples, expected_data)
Enter fullscreen mode Exit fullscreen mode

O teste com Mock para operação específica foi realizado no método test_transform_data_with_mocked_join, onde aplicamos um mock especificamente para o método filter. Esse mock substitui o resultado da operação de join por um DataFrame simulado, permitindo que as operações anteriores, como groupBy e join, sejam executadas de forma real. O teste, então, compara o DataFrame resultante com o valor esperado, garantindo que o mock para join foi utilizado corretamente, sem interferir nas demais transformações realizadas.

Essa abordagem híbrida traz várias vantagens. Ao garantir que as operações reais de PySpark, como join e groupBy, sejam mantidas, podemos validar a lógica das transformações sem perder a flexibilidade de substituir operações específicas, como filter, com mocks. Isso resulta em testes mais robustos e rápidos, eliminando a necessidade de um cluster Spark completo, o que facilita o desenvolvimento e a validação contínua do código.

É importante ressaltar que essa estratégia deve ser utilizada com cautela e somente em cenários onde não se cria um viés nos resultados. O objetivo do teste é assegurar que o processamento ocorra de forma correta; não devemos simplesmente atribuir valores sem testar a função de fato. Embora seja válido mockar trechos que podemos garantir que não afetarão o processo do teste unitário, é essencial lembrar que a função deve ser executada para validar seu comportamento real.

Assim, a abordagem híbrida faz muito mais sentido quando temos outros tipos de processamento agregado nessa função. Essa estratégia permite uma combinação eficaz de operações reais e simuladas, garantindo testes mais robustos e confiáveis

Concluindo

Os mocks são aliados valiosos na criação de testes eficazes, especialmente quando se trata de trabalhar com PySpark e outros serviços na nuvem. A implementação que exploramos usando unittest em Python não apenas nos ajudou a simular operações, mas também a manter a integridade dos nossos dados e processos. Com a flexibilidade que os mocks oferecem, podemos testar nossos pipelines sem o medo de causar estragos em ambientes de produção. E aí, preparados para o próximo desafio? No nosso próximo texto, vamos mergulhar no mundo das integrações com serviços da AWS e GCP, mostrando como mockar essas chamadas e garantir que suas pipelines funcionem perfeitamente. Até a próxima!

Top comments (0)