DEV Community

Cover image for Getting Started With Spring Cloud Stream
Brian McClain
Brian McClain

Posted on • Originally published at content.pivotal.io

Getting Started With Spring Cloud Stream

This post was co-written with Ben Wilcock, Product and Technical Marketing Manager for Spring at Pivotal.

🔔 A file has been uploaded! 🔔
🔔 A new user was registered! 🔔
🔔 An order was placed! 🔔

These sound like events that many parts of our application architecture might be interested in, right? For example, when an order is placed on our website, we’ll need a call to process the payment, a call to reserve inventory, and a call to begin the process of picking, packaging and shipping the product.

For a single order, this isn’t too bad. Our store can make a few requests to these backend services directly and it shouldn’t introduce too much overhead.But what happens if we’re really good at selling our product? Processing 100 orders a second suddenly means our frontend is making three hundred calls per second to our backend services. If we add one more service to that—say, to report to an internal sales dashboard— now that’s four hundred calls per second. That’s a lot of overhead!

What if instead, we can simply have our website alert our whole architecture at once? It can yell, “Hey! I made a sale” to our whole stack, and any component that’s interested can take the appropriate action. This means we don’t need to update our frontend as we add additional services, and our new services just need to know what to listen for.

Why Spring Cloud Stream?

The above is an example of an event-driven architecture, where instead of reaching out to each service one by one, our services instead emit a change of state. If a file is uploaded, our file service can emit it out to a messaging platform, and then our Super Duper Image Resizer 3000 service can listen for that and automatically generate differently sized profile images. Pivotal’s own Richard Seroter wrote about this very topic in detail, and it’s a great read. In his blog post, Richard talks about messaging as a way of reliably delivering events to many consumers quickly and in volume.

He also touches on something we want to talk about today: Spring Cloud Stream.

We’re big fans of both Kafka and RabbitMQ as event streaming platforms, so for this demo we’ll use Kafka. No matter which you choose to use, making it easy to produce and consume events is important for your developers. I’ve used a lot of frameworks that abstract away from the underlying message queue, but none quite as easy and flexible as Spring Cloud Stream. My teammate Ben Wilcock put together a demo that really shows just how easy it is to get up and running. Let’s take it for a spin—and to follow along, you can download the full source code here.

Prepping For The Demo

We only need a couple of things for our demo, which are Docker and Docker Compose, and of course your favorite distribution of the JDK (perhaps even AdoptOpenJDK, which we sponsor). To keep things easy, the demo includes a Docker Compose config that will set up both Kafka and RabbitMQ, though for our purposes we’ll only be using Kafka. We can spin this up with a simple command:

docker-compose up

This will read our docker-compose.yml file, download the necessary container images, run them, and configure them. After just a few moments, Kafka should be up and running and ready to go.

Sending Events

Our demo is made up of two Spring microservices, one to produce events and one to consume them. In our fictional scenario, the message producer will create a stream of applications for bank loans, and our processor will check if those applications should be approved or declined. Let’s start by producing some messages that will be sent to Kafka, the code for which is in the loansource directory.

There are a few files of code here. The Loan.java file defines a loan object and the Statuses.java file defines all the states a loan can be in. What’s interesting, though, is the LoansourceApplication.java file, which is what’s actually producing our messages. As you can imagine, Spring and its dependencies handle a lot of the wiring up of components for us automatically. Let’s take a look at LoansourceApplication.java to see how this works.

@Bean
public Supplier<Loan> supplyLoan() {
  return () -> {
    String rName = names.get(new Random().nextInt(names.size()));
    Long rAmount = amounts.get(new Random().nextInt(amounts.size()));
    Loan loan = new Loan(UUID.randomUUID().toString(), rName, rAmount);
    log.info("{} {} for ${} for {}", loan.getStatus(), 
             loan.getUuid(), loan.getAmount(), loan.getName());
    return loan;
  };
}

Supplier<> is a Java function data type. Because there is only one @Bean method that returns this type, Spring Cloud Stream knows exactly what to do next. By default, it will trigger this function once every second and send the result to the default MessageChannel named output. What’s nice about this function method is that it only contains business logic, so you can test it using your favorite testing methods.

We could use the spring.cloud.function.definition property in the application.properties file to explicitly declare which function bean we want to be bound to binding destinations, but for cases when you only have a single @Bean defined, this is not necessary. Likewise, if we wanted to use a different poller interval, we can use the spring.integration.poller.fixed-delay property in the application.properties file. The only question that remains is, “How does Spring know it’s Kafka we’re writing to?” For that, we take a look at our pom.xml:

<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Providing this dependency in our code tells Spring, “I’d like to send these messages to Kafka”. Since our Kafka server is listening on localhost on the default port, we don’t need to provide any additional configuration in our application.properties file, but we can of course do so if that’s not the case, providing information such as hostname, port, authentication, etc.

We can run our code and activate the kafka profile, which we’ve configured to be the profile that includes the Kafka SCS binding, and we should see it start producing messages:

cd loansource
./mvnw package spring-boot:run -DskipTests=true -Pkafka

After a few moments, we’ll see our application start creating new loans and sending them to Kafka:

2019-10-15...LoansourceApplication : PENDING 9eff9b58-e1f1-474d-8f1d-aa4db8dbb75a for $10000000 for Donald
2019-10-15...LoansourceApplication : PENDING d507c06c-81bb-4a98-8f85-38f74af36984 for $100 for Jacinda
2019-10-15...LoansourceApplication : PENDING 19fc86a4-d461-470c-8005-423ce1a258e7 for $100 for Jacinda
2019-10-15...LoansourceApplication : PENDING 33f3756c-ea9b-472f-bad2-73f1647188b1 for $10000 for Vladimir
2019-10-15...LoansourceApplication : PENDING 1625d10f-c1c8-4e75-8fe8-10ce363ef56f for $10000000 for Theresa

If you prefer, you can also see the messages in your browser using KafDrop. Simply point your browser to localhost:9000 and you should see a UI that allows you to look at the messages stored in Kafka:

Image 1

Receiving Events

We’ve got half of the equation here, but we also need something to consume and process these events. For this, we’ll look in the loancheck directory. For this half of the demo, our loan checker will observe every application and approve or decline it. If approved, an approval message will be sent to the approved topic otherwise, a denial message will be sent to the declined topic. You can extrapolate from here that other systems down the line could listen for and pick up these messages for further processing. For example, maybe a payout system listens for an approved loan to start processing it.

We’ll see the code here is a little different, just pointing to different topics. We see that in LoanCheckApplication.java, we have the @EnableBinding(LoanProcessor.class) annotation, meaning that all of our definitions for channel bindings are found in the LoanProcessor class.

In our LoanProcessor.java file, we’ll see we define the MessageChannel we’re listening on is named output, matching the default topic our producer writes to. Additionally, we define two other MessageChannels that we’ll be writing to, approved and declined. For each of these, we also define which method to invoke when a message is received on those channels.

@Component
public interface LoanProcessor {
  String APPLICATIONS_IN = "output";
  String APPROVED_OUT = "approved";
  String DECLINED_OUT = "declined";

  @Input(APPLICATIONS_IN)
  SubscribableChannel sourceOfLoanApplications();

  @Output(APPROVED_OUT)
  MessageChannel approved();

  @Output(DECLINED_OUT)
  MessageChannel declined();
}

Finally, we can see how this ties into which method is invoked if we take a look at the LoanChecker.java file. We’ll see we have a method checkAndSortLoans with the @StreamListener annotation that matches our Input we defined previously:

@StreamListener(LoanProcessor.APPLICATIONS_IN)
public void checkAndSortLoans(Loan loan) {
  log.info("{} {} for ${} for {}", loan.getStatus(), 
           loan.getUuid(), loan.getAmount(), loan.getName());

  if (loan.getAmount() > MAX_AMOUNT) {
    loan.setStatus(Statuses.DECLINED.name());
    processor.declined().send(message(loan));
  } else {
    loan.setStatus(Statuses.APPROVED.name());
    processor.approved().send(message(loan));
  }
}

We can start this code up much like we did our loansource, by opening up a separate terminal and running the following:

cd loancheck
./mvnw package spring-boot:run -DskipTests=true -Pkafka

After a few moments, we’ll start seeing our pending messages come through and then get sorted into approved or declined:

2019-10-15...LoanChecker : PENDING 95a887cf-ab5f-48c4-b03b-556675446cfc for $1000 for Kim
2019-10-15...LoanChecker : APPROVED 95a887cf-ab5f-48c4-b03b-556675446cfc for $1000 for Kim
2019-10-15...LoanChecker : PENDING a15f13fe-fc9a-40fb-b6f0-24106a18c0cd for $100000000 for Angela
2019-10-15...LoanChecker : DECLINED a15f13fe-fc9a-40fb-b6f0-24106a18c0cd for $100000000 for Angela

Image 2

Wrapping Up

Spring Cloud Stream provides an extremely powerful abstraction for potentially complicated messaging platforms, turning the act of producing messages into just a couple lines of code. Should your infrastructure needs change and you need to migrate to a new messaging platform, not a single line of code changes other than your pom file. No matter if you’re using Kafka, RabbitMQ, or a cloud provider’s solution such as GCP Pub/Sub or Azure Event Hub, Spring Cloud Stream means it’s simple and quick to get up and running.


About the Author
Brian is a Principal Product Marketing Manager on the Technical Marketing team at Pivotal, with a focus on technical educational content for Pivotal customers as well as the Cloud Foundry, BOSH, and Knative communities. Prior to Pivotal, Brian worked on both the development and operations of software, with a heavy focus on Cloud Foundry and BOSH at companies in many industries including finance, entertainment and technology. He loves learning and experimenting in many fields of technology, and more importantly sharing the lessons learned along the way.

Top comments (0)