DEV Community

Axel
Axel

Posted on • Updated on

Outbox Pattern in Spring Boot 3 and Apache Kafka

Introduction

This post is a brief example to use the Outbox Design Pattern in Spring Boot and Apache Kafka.

Explanation

Here is a short explanation from the great website (microservices.io) :

The solution is for the service that sends the message to first store the message in the database as part of the transaction that updates the business entities. A separate process then sends the messages (outbox entity in JSON format) to the message broker.

Outbox Pattern Architecture

Use cases

Utilize the transactional outbox pattern under the following circumstances:

  • When constructing an event-driven application wherein a database modification triggers an event notification.
  • When aiming to guarantee atomicity across operations involving two separate services.
  • When incorporating the event sourcing pattern into your system architecture.

What do i need ?

IDE (to launch Spring Boot in local mode)

  • IntelliJ IDEA
  • Visual Code

Java (version 21)

  • Spring Boot (3.2.3)
  • Spring Kafka (to communicate with Apache Kafka - principally for the consumer part)
  • Spring Data (for the Rest Controller)
  • Lombok (1.18.30)
  • MapStruct (1.5.5.Final)
  • Postgresql
  • Swagger (springdoc 2.3.0)

Docker

  • Apache Kafka
  • Apache Kafka-ui
  • Zookeeper
  • Postgresql (or H2 if you want to work with in-memory DB)

Starting with Spring Boot Application

Folder Structure

Below is the folder structure with the configuration of services, kafka configuration and controller.

java
    └── com.outbox.pattern
         └── controller/
             ├── OutboxService.java
         └── domain/
             ├── OrderDomain.java
             ├── OutboxEventDomain.java
         └── entity/
             ├── OrderEntity.java
             ├── OutboxEventEntity.java
         └── exception/
             ├── OrderProcessingException.java
         └── integration/
             ├── KafkaProducerConfig.java
         └── repositories/
             ├── OrderRepository.java
             ├── OutboxRepository.java
         └── Services/
             └── orders/
                 ├── OrdersService.java
                 ├── OrdersServiceImpl.java
             └── outbox/
                 ├── OutboxService.java
                 ├── OutboxServiceImpl.java
         └── utils/
                 ├── OrderMapper.java
                 ├── OrderStatus.java
    ├── OutboxPatternApplication.java
resources/
├── application.yaml
Enter fullscreen mode Exit fullscreen mode

Application Spring Boot

OutboxPatternApplication.java is the starter of Spring Boot application

@SpringBootApplication
@EnableScheduling
@EnableJpaAuditing
public class OutboxPatternApplication {
    public static void main(String[] args) {
    SpringApplication.run(OutboxPatternApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

Controller

OrderController.java is the Rest Controller to create or cancel an order.
We can call it via the swagger ui (http://localhost:8080/swagger-ui/index.html#/order-controller)

@RestController
@RequestMapping("/order")
public class OrderController {
    private final OrdersService ordersService;

    @Autowired
    public OrderController(OrdersService ordersService) {
        this.ordersService = ordersService;
    }

    // Post an order
    @PostMapping(produces = "application/json")
    public ResponseEntity<UUID> createOrder(@RequestBody @Valid OrderDomain orderDomain) {
        UUID order = ordersService.createOrder(orderDomain);
        return ResponseEntity.status(HttpStatus.CREATED)
                .body(order);
    }

    // Cancel an order
    @DeleteMapping(value = "/{orderUUID}", produces = "application/json")
    public ResponseEntity<Boolean> cancelOrder(@PathVariable @Valid UUID orderUUID) {
        return ResponseEntity.status(HttpStatus.OK)
                .body(ordersService.cancelOrder(orderUUID));
    }
}
Enter fullscreen mode Exit fullscreen mode

Domain

OrderDomain.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderDomain {
    private String product_name;
    private int quantity;
}
Enter fullscreen mode Exit fullscreen mode

OutboxEventDomain.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OutboxEventDomain {
    private OrderStatus eventType;
    private String eventPayload;
}
Enter fullscreen mode Exit fullscreen mode

@Data @AllArgsConstructor @NoArgsConstructor come from the MapStruct. It is used to generate the Getter / Setter and the constructor.

Entity - DB entities

OrderEntity.java

@Data
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "orders")
@EntityListeners(AuditingEntityListener.class)
public class OrderEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.UUID)
    private UUID id;

    @Column(name = "status", nullable = false)
    @Enumerated(EnumType.STRING)
    private OrderStatus status;

    @Column(name = "product_name", nullable = false)
    private String product_name;

    @Column(name = "quantity", nullable = false)
    private int quantity;

    @Column(name ="created_date", nullable = false)
    @CreatedDate
    private Instant orderDate;
}
Enter fullscreen mode Exit fullscreen mode

@EntityListeners(AuditingEntityListener.class) is used in combinaison with @CreatedDate to retrieve automatically the creation date of the record.

OutboxEventEntity.java

@Data
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "outbox")
public class OutboxEventEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.UUID)
    private UUID id;

    @Column(name = "event_type", nullable = false)
    private OrderStatus eventType;

    @Column(name ="event_payload", nullable = false)
    private String eventPayload;
}
Enter fullscreen mode Exit fullscreen mode

Exception - Exception Handling

OrderProcessingException.java

public class OrderProcessingException extends RuntimeException {
    public OrderProcessingException(String message, String e) {
        super(message);
    }
}
Enter fullscreen mode Exit fullscreen mode

Integration - Integration of Kafka

KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String kafkaHost;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
Enter fullscreen mode Exit fullscreen mode

${spring.kafka.bootstrap-servers} this parameter is a property from the application.yaml

Repository - Connection to the DB

OrderRepository.java

public interface OrderRepository extends CrudRepository<OrderEntity, UUID> {
    @Modifying
    @Query("""
                UPDATE OrderEntity o
                   SET o.status = :status
                 WHERE o.id = :id
            """)
    void updateStatus(@Param(value = "id") UUID id,
                      @Param(value = "status") OrderStatus status);
}
Enter fullscreen mode Exit fullscreen mode

OutboxRepository.java

@Repository
public interface OutboxRepository extends CrudRepository<OutboxEventEntity, UUID> {
    //Empty Interface
}

Enter fullscreen mode Exit fullscreen mode

Services - Communication between the Rest Controller and the DB

OrdersService.java

public interface OrdersService {
    UUID createOrder(OrderDomain orderDTO);
    boolean cancelOrder(UUID orderUUID);
}
Enter fullscreen mode Exit fullscreen mode

OrdersServiceImpl.java

@Service
public class OrdersServiceImpl implements OrdersService {
    private final OutboxRepository outboxRepository;
    private final OrderRepository orderRepository;
    private final OrderMapper orderMapper;
    private final ObjectMapper objectMapper;

    @Autowired
    public OrdersServiceImpl(OutboxRepository outboxRepository, OrderRepository orderRepository, OrderMapper orderMapper, ObjectMapper objectMapper) {
        this.outboxRepository = outboxRepository;
        this.orderRepository = orderRepository;
        this.orderMapper = orderMapper;
        this.objectMapper = objectMapper;
    }

    @Override
    @Transactional
    public UUID createOrder(OrderDomain orderDomain) {
        OrderEntity orderEntity = createOrderInDatabase(orderDomain);
        saveOrderEventToDatabase(orderEntity, OrderStatus.CREATED);
        return orderEntity.getId();
    }

    private OrderEntity createOrderInDatabase(OrderDomain orderDomain) {
        try {
            OrderEntity orderEntity = orderMapper.orderEntityToOrderDomain(orderDomain);
            orderEntity.setStatus(OrderStatus.CREATED);
            return orderRepository.save(orderEntity);
        } catch (Exception e) {
            throw new OrderProcessingException("Error processing order creation", e.getMessage());
        }
    }

    private void saveOrderEventToDatabase(OrderEntity orderEntity, OrderStatus eventType) {
        OutboxEventEntity outboxEvent = new OutboxEventEntity();
        try {
            outboxEvent.setEventPayload(objectMapper.writeValueAsString(orderEntity));
            outboxEvent.setEventType(eventType);
            outboxRepository.save(outboxEvent);
        } catch (JsonProcessingException e) {
            throw new OrderProcessingException("Error processing order event creation", e.getMessage());
        }
    }

    @Override
    @Transactional
    public boolean cancelOrder(UUID orderUUID) {
        OrderEntity orderEntity = getOrderEntity(orderUUID);
        orderRepository.updateStatus(orderUUID, OrderStatus.CANCELLED);
        saveOrderEventToDatabase(orderEntity, OrderStatus.CANCELLED);
        return true;
    }

    private OrderEntity getOrderEntity(UUID orderUUID) {
        return orderRepository.findById(orderUUID)
                .orElseThrow(() -> new OrderProcessingException(String.format("Order entity with id %s not found", orderUUID), "NOT_FOUND"));
    }
}
Enter fullscreen mode Exit fullscreen mode

OutboxService.java

public interface OutboxService {
    void eventProcessing();
}
Enter fullscreen mode Exit fullscreen mode

OutboxServiceImpl.java
In this OutboxServiceImpl, we have created two different partition in Kafka to split the message payload for CREATED and CANCELLED.

NOTE: It is not the use case of partition for same domain but it has been added for learning purpose.

@Service
public class OutboxServiceImpl implements OutboxService {
    private static final Logger LOG = LoggerFactory.getLogger(OutboxServiceImpl.class);

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    @Value("${configuration.kafka.outbox-topic}")
    private String outboxTopic;

    @Autowired
    public OutboxServiceImpl(KafkaTemplate<String, String> kafkaTemplate, OutboxRepository outboxRepository, ObjectMapper objectMapper) {
        this.kafkaTemplate = kafkaTemplate;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    @Override
    @Scheduled(fixedRateString = "${configuration.kafka.scheduled}")
    public void eventProcessing() {
        List<OutboxEventEntity> listOfOutboxEventEntities = new ArrayList<>();
        outboxRepository.findAll().forEach(listOfOutboxEventEntities::add);
        LOG.info("Number of outbox events: {}", listOfOutboxEventEntities.size());

        if (!listOfOutboxEventEntities.isEmpty()) {
            for (OutboxEventEntity outboxEventEntity : listOfOutboxEventEntities) {
                LOG.info("Sending event to Kafka");
                String eventType = determineEventType(outboxEventEntity.getEventType());
                if (eventType != null) {
                    sendEventToKafka(eventType, outboxEventEntity);
                }
                outboxRepository.deleteById(outboxEventEntity.getId());
            }
        }
    }

    private String determineEventType(OrderStatus eventType) {
        return switch (eventType) {
            case CREATED -> "CREATED";
            case CANCELLED -> "CANCELLED";
            default -> null;
        };
    }

    private void sendEventToKafka(String eventType, OutboxEventEntity outboxEventEntity) {
        try {
            CompletableFuture<SendResult<String, String>> sendResult = kafkaTemplate.send(outboxTopic, eventType, objectMapper.writeValueAsString(outboxEventEntity));
            SendResult<String, String> result = sendResult.get();
            LOG.info("Partition: {}", result.getRecordMetadata().partition());
        } catch (JsonProcessingException | InterruptedException | ExecutionException e) {
            LOG.error("Error sending event to Kafka: {}", e.getMessage());
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Utils - MapStruct to map entities, domains and OrderStatusEnum

OrderMapper.java

@Mapper(
        componentModel = MappingConstants.ComponentModel.SPRING,
        unmappedTargetPolicy = ReportingPolicy.IGNORE
)
public interface OrderMapper {
    OrderMapper INSTANCE = Mappers.getMapper(OrderMapper.class);

    OrderDomain orderDomainToOrderEntity(OrderEntity orderDomain);
    OrderEntity orderEntityToOrderDomain(OrderDomain orderDomain);
}
Enter fullscreen mode Exit fullscreen mode

OrderStatus.java

public enum OrderStatus {
    CREATED,
    CANCELLED
}
Enter fullscreen mode Exit fullscreen mode

Configuration of Spring Boot and Kafka

application.yaml configuration of Spring Boot and Kafka

spring:
  application:
    name: outbox-pattern
  datasource:
    url: jdbc:postgresql://${YOUR_IP_ADDRESS}:5432/microservices
    username: postgres
    password: postgres
  jpa:
    database: postgresql
    show-sql: true
  kafka:
    bootstrap-servers: ${YOUR_IP_ADDRESS}:9092
    consumer:
      group-id: outbox-group-id
configuration:
  kafka:
    scheduled: 5000
    outbox-topic: outbox-topic
Enter fullscreen mode Exit fullscreen mode

${YOUR_IP_ADDRESS} replace it with your IP

Here is the dependencies in my pom.xml that i have used to generate this example :

<project>
...
    <properties>
        <java.version>21</java.version>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <lombok.version>1.18.30</lombok.version>
        <mapstruct.version>1.5.5.Final</mapstruct.version>
        <springdoc.version>2.3.0</springdoc.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mapstruct</groupId>
            <artifactId>mapstruct</artifactId>
            <version>${mapstruct.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>21</source>
                    <target>21</target>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                            <version>${lombok.version}</version>
                        </path>
                        <path>
                            <groupId>org.mapstruct</groupId>
                            <artifactId>mapstruct-processor</artifactId>
                            <version>${mapstruct.version}</version>
                        </path>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok-mapstruct-binding</artifactId>
                            <version>0.2.0</version>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
Enter fullscreen mode Exit fullscreen mode

Postgresql

create table outbox
(
    event_payload varchar not null,
    event_type    varchar not null,
    id            uuid    not null
);
Enter fullscreen mode Exit fullscreen mode
create table orders
(
    id           uuid    not null,
    product_name varchar not null,
    quantity     integer not null,
    created_date timestamp with time zone,
    status       varchar not null
);
Enter fullscreen mode Exit fullscreen mode

Docker

To test all services in local, we are using the docker-compose below to create Kafka-ui, Kafka and Postgresql environment.

You need to have ZooKeeper up and running to have Kafka working.

Copy the file below and run docker compose up in the same folder as the file.

version: "3.9"

services:
  postgresql:
    image: postgres:latest
    container_name: postgresql
    expose:
      - "5432"
    ports:
      - "5432:5432"
    restart: always
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=microservices
      - POSTGRES_HOST=5432
    command: -p 5432
    volumes:
      - postgres:/var/lib/postgresql/data
    networks: [ "microservices" ]

  zoo:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zoo
    container_name: zoo
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo:2888:3888
    volumes:
      - "./zookeeper:/zookeeper"
    networks: [ "microservices" ]

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
      - "29092:29092"
      - "9999:9999"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_JMX_PORT: 9999
      KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo
    volumes:
      - "./kafka_data:/kafka"
    networks: [ "microservices" ]

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8086:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:19092
    networks: [ "microservices" ]

networks:
  microservices:
    name: microservices

# Define named volumes
volumes:
  postgres:
  mongo-data:
    driver: local  
Enter fullscreen mode Exit fullscreen mode

Kafka Administration

In the kafka UI, you have to create a Topic with 2 partitions.

Kafka topic creation

Here is the Kafka topic created with 2 partitions :
Kafka topic created

Access to the application and Kafka

swagger-ui : http://localhost:8080/swagger-ui/index.html#/
kafka UI : http://localhost:8086

Github code

Here is the link to the project : Github page

Going further

Regarding the application, it is a package that have all component in one jar.
It could be also possible to extract the polling service (with Kafka consumer) to a specific package to deploy it for exemple in a specific pod on Kubernetes to achieve the high availability or scalability implementing load balancing and handling failover scenarios.

Error Handling: Enhance error handling in your services. For example, in the cancelOrder method, if the order is not found, you throw a OrderProcessingException, which is good. However, you might want to catch exceptions thrown during Kafka message sending and handle them appropriately, perhaps by logging or retrying.

Security: Ensure that sensitive information such as database passwords or Kafka server configurations are properly secured, especially if you're sharing this code or deploying it in production environments.

Testing: While you have provided the endpoints via Swagger for testing, consider writing unit tests and integration tests to ensure the correctness of your code, especially for critical business logic and error handling scenarios.

Thank you for reading!

If you have any questions, feedback, or suggestions, please feel free to leave them in the comments below. I'm eager to hear from you and respond to your thoughts!

Top comments (0)