Sometimes your Kafka producer code is doing things that need to be properly validated and of course, we developers resort to writing a test. If the functionality we want to test is nicely encapsulated we can do that using a unit test. Kafka helps us with that by providing a mock implementation of Producer<> interface called, you guessed it, MockProducer
.
Preparation for test
TransactionProcessor
class below is our class under test. It has a process(Transaction)
method that receives a Transaction object which in our example only contains userId
and amount
properties. Depending on the amount the processor will decide to which topic to write the object. If the amount is above 100.000 it will use the transactions_high_prio
topic. Otherwise, it will write a transaction to the transactions_regular_prio
topic.
public class TransactionProcessor {
public static final double HIGH_PRIORITY_THRESHOLD = 100.000;
private final Producer<String, String> kafkaProducer;
private final String highPrioTopic;
private final String regularPrioTopic;
private final Gson gson = new Gson();
public TransactionProcessor(Producer<String, String> kafkaProducer, String highPrioTopic, String regularPrioTopic) {
this.kafkaProducer = kafkaProducer;
this.highPrioTopic = highPrioTopic;
this.regularPrioTopic = regularPrioTopic;
}
public void process(Transaction transaction){
String selectedTopic = regularPrioTopic;
if (transaction.getAmount() >= HIGH_PRIORITY_THRESHOLD) {
selectedTopic = highPrioTopic;
}
String transactionJson = gson.toJson(transaction);
ProducerRecord<String, String> record =
new ProducerRecord<>(selectedTopic, transaction.getUserId(), transactionJson);
kafkaProducer.send(record);
}
}
And Transaction class looks like this:
public class Transaction {
private String userId;
private double amount;
//removed for brevity
}
An important thing to notice here is that TransactionProcessor
uses the Producer
interface, not the implementation (which is the KafkaProducer class). This will make it possible to unit test our adapter using the MockProducer
.
MockProducer in action
Ok, now onto the test class. TransactionProcessorTest
creates an instance of the MockProducer that we will provide to the TransactionProcessor.
class TransactionProcessorTest {
private static final String HIGH_PRIO_TOPIC = "transactions_high_prio";
private static final String REGULAR_PRIO_TOPIC = "transactions_regular_prio";
MockProducer<String, String> mockProducer =
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
MockProducer
constructor takes a couple of parameters, namely key and value serializers, in our case StringSerializer(s). The first parameter, autocomplete
, is a boolean that tells MockProducer to automatically complete all requests immediately. In regular testing, you want to set this parameter to true
so that messages are immediately ‘sent’. If you set it to false
you will need to explicitly call completeNext()
and errorNext(RuntimeException)
methods after calling the send()
method. You would want to do this to e.g. test the error handling in your producer (by providing the exception you want to handle as the parameter to the errorNext method).
After we’ve created the MockProducer, we create the instance of the class we wish to test.
TransactionProcessor processor =
new TransactionProcessor(mockProducer, HIGH_PRIO_TOPIC, REGULAR_PRIO_TOPIC);
Now it's time to test whether the selection of topics based on amount is correct. We will create two Transaction objects, the first one with a low amount and the second one with an amount higher than our threshold (which is 100.000).
@Test
public void testPrioritySelection(){
Double lowAmount = 50.2d;
Double highAmount = 250000d;
Transaction regularPrioTransaction = new Transaction("user1", lowAmount);
processor.process(regularPrioTransaction);
Transaction highPrioTransaction = new Transaction("user2", highAmount);
processor.process(highPrioTransaction);
assertThat(mockProducer.history()).hasSize(2);
ProducerRecord<String, String> regTransactionRecord = mockProducer.history().get(0);
assertThat(regTransactionRecord.value()).contains(lowAmount.toString());
assertThat(regTransactionRecord.topic()).isEqualTo(REGULAR_PRIO_TOPIC);
ProducerRecord<String, String> highTransactionRecord = mockProducer.history().get(1);
assertThat(highTransactionRecord.value()).contains(highAmount.toString());
assertThat(highTransactionRecord.topic()).isEqualTo(HIGH_PRIO_TOPIC);
}
After calling processor.process(…) method twice we want to validate that there are two records sent to Kafka. For that, we use MockProducer#history() method which returns the list of records that the producer received to send to Kafka. We fetch each record from the history to ensure it is ‘sent’ to the proper topic.
Code on Github
All code examples from this blog post are available on Coding Harbour’s GitHub.
Would you like to learn more about Kafka?
I have created a Kafka mini-course that you can get absolutely free. Sign up for it over at Coding Harbour.
Photo credit: @paulschnuerle
Top comments (0)