π¨βπ» Full list what has been used:
Spring web framework
Spring WebFlux Reactive REST Services
gRPC Kotlin gRPC
gRPC-Spring-Boot-Starter gRPC Spring Boot Starter
Spring Data R2DBC a specification to integrate SQL databases using reactive drivers
Zipkin open source, end-to-end distributed tracing
Spring Cloud Sleuth autoconfiguration for distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
Kubernetes automating deployment, scaling, and management of containerized applications
Docker and docker-compose
Helm The package manager for Kubernetes
Flywaydb for migrations
Source code you can find in the GitHub repository.
For this project let's implement Kotlin gRPC microservice using Spring and Postgresql.
gRPC is very good for low latency and high throughput communication, so it's great for microservices where efficiency is critical.
Messages are encoded with Protobuf by default. While Protobuf is efficient to send and receive, its binary format.
Spring doesn't provide us gRPC starter out of the box, and we have to use community one, the most popular is yidongnan
and LogNet, both is good and ready to use,
for this project selected first one.
At the first step we have to add gRPC Kotlin Codegen Plugin for Protobuf Compiler.
All UI interfaces will be available on ports:
Swagger UI: http://localhost:8000/webjars/swagger-ui/index.html
Grafana UI: http://localhost:3000
Zipkin UI: http://localhost:9411
Prometheus UI: http://localhost:9090
Docker-compose file for this project:
version: "3.9"
services:
microservices_postgresql:
image: postgres:latest
container_name: microservices_postgresql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=bank_accounts
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./docker_data/microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: microservices_node_exporter
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: microservices_grafana
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks: [ "microservices" ]
zipkin:
image: openzipkin/zipkin:latest
restart: always
container_name: microservices_zipkin
ports:
- "9411:9411"
networks: [ "microservices" ]
networks:
microservices:
name: microservices
gRPC messages are serialized using Protobuf, an efficient binary message format, it serializes very quickly on the server and client,
and its serialization results in small message payloads, important in limited bandwidth scenarios like mobile apps.
The interface contract for specifying the RPC definitions for each service would be defined using Protocol Buffers.
Each microservice will have a proto file defined here for this.
At the first we have to define a service in a proto file and compile it, it has at most unary methods and one server streaming:
syntax = "proto3";
package com.example.grpc.bank.service;
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";
service BankAccountService {
rpc createBankAccount (CreateBankAccountRequest) returns (CreateBankAccountResponse);
rpc getBankAccountById (GetBankAccountByIdRequest) returns (GetBankAccountByIdResponse);
rpc depositBalance (DepositBalanceRequest) returns (DepositBalanceResponse);
rpc withdrawBalance (WithdrawBalanceRequest) returns (WithdrawBalanceResponse);
rpc getAllByBalance (GetAllByBalanceRequest) returns (stream GetAllByBalanceResponse);
rpc getAllByBalanceWithPagination(GetAllByBalanceWithPaginationRequest) returns (GetAllByBalanceWithPaginationResponse);
}
message BankAccountData {
string id = 1;
string firstName = 2;
string lastName = 3;
string email = 4;
string address = 5;
string currency = 6;
string phone = 7;
double balance = 8;
string createdAt = 9;
string updatedAt = 10;
}
message CreateBankAccountRequest {
string email = 1;
string firstName = 2;
string lastName = 3;
string address = 4;
string currency = 5;
string phone = 6;
double balance = 7;
}
message CreateBankAccountResponse {
BankAccountData bankAccount = 1;
}
message GetBankAccountByIdRequest {
string id = 1;
}
message GetBankAccountByIdResponse {
BankAccountData bankAccount = 1;
}
message DepositBalanceRequest {
string id = 1;
double balance = 2;
}
message DepositBalanceResponse {
BankAccountData bankAccount = 1;
}
message WithdrawBalanceRequest {
string id = 1;
double balance = 2;
}
message WithdrawBalanceResponse {
BankAccountData bankAccount = 1;
}
message GetAllByBalanceRequest {
double min = 1;
double max = 2;
int32 page = 3;
int32 size = 4;
}
message GetAllByBalanceResponse {
BankAccountData bankAccount = 1;
}
message GetAllByBalanceWithPaginationRequest {
double min = 1;
double max = 2;
int32 page = 3;
int32 size = 4;
}
message GetAllByBalanceWithPaginationResponse {
repeated BankAccountData bankAccount = 1;
int32 page = 2;
int32 size = 3;
int32 totalElements = 4;
int32 totalPages = 5;
bool isFirst = 6;
bool isLast = 7;
}
The actual maven dependencies for gRPC:
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-kotlin-stub</artifactId>
<version>${grpc.kotlin.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${java.grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-kotlin</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
And the maven protobuf plugin:
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${java.grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>grpc-kotlin</id>
<groupId>io.grpc</groupId>
<artifactId>protoc-gen-grpc-kotlin</artifactId>
<version>${grpc.kotlin.version}</version>
<classifier>jdk8</classifier>
<mainClass>io.grpc.kotlin.generator.GeneratorRunner</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
</execution>
<execution>
<id>compile-kt</id>
<goals>
<goal>compile-custom</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<outputDirectory>${project.build.directory}/generated-sources/protobuf/kotlin</outputDirectory>
<pluginId>kotlin</pluginId>
</configuration>
</execution>
</executions>
</plugin>
The plugin generates a class for each of your gRPC services.
For example: BankAccountGrpcServiceGrpc where BankAccountGrpcService is the name of the gRPC service in the proto file.
This class contains both the client stubs and the server ImplBase that you will need to extend.
After compilation is done, we can implement out gRPC service.
@GrpcService allow us to pass list of interceptors specific for this service, so we can add LogGrpcInterceptor here.
For request validation let's use spring-boot-starter-validation which uses Hibernate Validator
@GrpcService(interceptors = [LogGrpcInterceptor::class])
class BankAccountGrpcService(
private val bankAccountService: BankAccountService,
private val tracer: Tracer,
private val validator: Validator
) : BankAccountServiceGrpcKt.BankAccountServiceCoroutineImplBase() {
override suspend fun createBankAccount(request: CreateBankAccountRequest): CreateBankAccountResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(CREATE_BANK_ACCOUNT)
runWithTracing(span) {
bankAccountService.createBankAccount(validate(BankAccount.of(request)))
.let { CreateBankAccountResponse.newBuilder().setBankAccount(it.toProto()).build() }
.also { it ->
log.info("created bank account: $it").also { span.tag("account", it.toString()) }
}
}
}
}
override suspend fun getBankAccountById(request: GetBankAccountByIdRequest): GetBankAccountByIdResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(GET_BANK_ACCOUNT_BY_ID)
runWithTracing(span) {
bankAccountService.getBankAccountById(UUID.fromString(request.id))
.let { GetBankAccountByIdResponse.newBuilder().setBankAccount(it.toProto()).build() }
.also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
}
}
}
override suspend fun depositBalance(request: DepositBalanceRequest): DepositBalanceResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(DEPOSIT_BALANCE)
runWithTracing(span) {
bankAccountService.depositAmount(UUID.fromString(request.id), BigDecimal.valueOf(request.balance))
.let { DepositBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
.also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
}
}
}
override suspend fun withdrawBalance(request: WithdrawBalanceRequest): WithdrawBalanceResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(WITHDRAW_BALANCE)
runWithTracing(span) {
bankAccountService.withdrawAmount(UUID.fromString(request.id), BigDecimal.valueOf(request.balance))
.let { WithdrawBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
.also { it -> log.info("response: $it").also { span.tag("response", it.toString()) } }
}
}
}
override fun getAllByBalance(request: GetAllByBalanceRequest): Flow<GetAllByBalanceResponse> {
runWithTracing(tracer, GET_ALL_BY_BALANCE) {
return bankAccountService.findAllByBalanceBetween(validate(FindByBalanceRequestDto.of(request)))
.map { GetAllByBalanceResponse.newBuilder().setBankAccount(it.toProto()).build() }
.flowOn(Dispatchers.IO + tracer.asContextElement())
}
}
override suspend fun getAllByBalanceWithPagination(request: GetAllByBalanceWithPaginationRequest): GetAllByBalanceWithPaginationResponse =
withContext(tracer.asContextElement()) {
withTimeout(timeOutMillis) {
val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_WITH_PAGINATION)
runWithTracing(span) {
bankAccountService.findByBalanceAmount(validate(FindByBalanceRequestDto.of(request)))
.toGetAllByBalanceWithPaginationResponse()
.also { log.info("response: $it") }.also { span.tag("response", it.toString()) }
}
}
}
private fun <T> validate(data: T): T {
return data.run {
val errors = validator.validate(data)
if (errors.isNotEmpty()) throw ConstraintViolationException(errors).also { log.error("validation error: ${it.localizedMessage}") }
data
}
}
companion object {
private val log = LoggerFactory.getLogger(BankAccountGrpcService::class.java)
private const val timeOutMillis = 5000L
private const val CREATE_BANK_ACCOUNT = "BankAccountGrpcService.createBankAccount"
private const val GET_BANK_ACCOUNT_BY_ID = "BankAccountGrpcService.getBankAccountById"
private const val DEPOSIT_BALANCE = "BankAccountGrpcService.depositBalance"
private const val WITHDRAW_BALANCE = "BankAccountGrpcService.withdrawBalance"
private const val GET_ALL_BY_BALANCE = "BankAccountGrpcService.getAllByBalance"
private const val GET_ALL_BY_BALANCE_WITH_PAGINATION = "BankAccountGrpcService.getAllByBalanceWithPagination"
}
}
fun Page<BankAccount>.toGetAllByBalanceWithPaginationResponse(): GetAllByBalanceWithPaginationResponse {
return GetAllByBalanceWithPaginationResponse
.newBuilder()
.setIsFirst(this.isFirst)
.setIsLast(this.isLast)
.setTotalElements(this.totalElements.toInt())
.setTotalPages(this.totalPages)
.setPage(this.pageable.pageNumber)
.setSize(this.pageable.pageSize)
.addAllBankAccount(this.content.map { it.toProto() })
.build()
}
Interceptors are a gRPC concept that allows apps to interact with incoming or outgoing gRPC calls.
They offer a way to enrich the request processing pipeline.
We can add gRPC interceptors, here we implement LogGrpcInterceptor:
class LogGrpcInterceptor : ServerInterceptor {
override fun <ReqT : Any?, RespT : Any?> interceptCall(
call: ServerCall<ReqT, RespT>,
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> {
log.info("Service: ${call.methodDescriptor.serviceName}, Method: ${call.methodDescriptor.bareMethodName}, Headers: $headers")
return next.startCall(call, headers)
}
companion object {
private val log = LoggerFactory.getLogger(LogGrpcInterceptor::class.java)
}
}
and add it to the global GrpcGlobalServerInterceptor:
@Configuration(proxyBeanMethods = false)
class GlobalInterceptorConfiguration {
@GrpcGlobalServerInterceptor
fun logServerInterceptor(): LogGrpcInterceptor? = LogGrpcInterceptor()
}
The service layer of the microservice has a few methods, for example, working with lists of data it has two methods,
one which returns PageImpl used in unary method response and one returns Flow for gRPC streaming response method.
The current Spring version supports @Transactional annotation with R2DBC
The interface and implementation are below:
@Service
interface BankAccountService {
suspend fun createBankAccount(bankAccount: BankAccount): BankAccount
suspend fun getBankAccountById(id: UUID): BankAccount
suspend fun depositAmount(id: UUID, amount: BigDecimal): BankAccount
suspend fun withdrawAmount(id: UUID, amount: BigDecimal): BankAccount
fun findAllByBalanceBetween(requestDto: FindByBalanceRequestDto): Flow<BankAccount>
suspend fun findByBalanceAmount(requestDto: FindByBalanceRequestDto): Page<BankAccount>
}
@Service
class BankAccountServiceImpl(
private val bankRepository: BankRepository,
private val tracer: Tracer
) : BankAccountService {
@Transactional
override suspend fun createBankAccount(@Valid bankAccount: BankAccount): BankAccount =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(CREATE_BANK_ACCOUNT)
runWithTracing(span) {
bankRepository.save(bankAccount).also { span.tag("saved account", it.toString()) }
}
}
@Transactional(readOnly = true)
override suspend fun getBankAccountById(id: UUID): BankAccount =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(GET_BANK_ACCOUNT_BY_ID)
runWithTracing(span) {
bankRepository.findById(id).also { span.tag("bank account", it.toString()) }
?: throw BankAccountNotFoundException(id.toString())
}
}
@Transactional
override suspend fun depositAmount(id: UUID, amount: BigDecimal): BankAccount =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(DEPOSIT_AMOUNT)
runWithTracing(span) {
bankRepository.findById(id)
?.let { bankRepository.save(it.depositAmount(amount)) }
.also { span.tag("bank account", it.toString()) }
?: throw BankAccountNotFoundException(id.toString())
}
}
@Transactional
override suspend fun withdrawAmount(id: UUID, amount: BigDecimal): BankAccount =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(WITHDRAW_AMOUNT)
runWithTracing(span) {
bankRepository.findById(id)
?.let { bankRepository.save(it.withdrawAmount(amount)) }
.also { span.tag("bank account", it.toString()) }
?: throw BankAccountNotFoundException(id.toString())
}
}
@Transactional(readOnly = true)
override fun findAllByBalanceBetween(requestDto: FindByBalanceRequestDto): Flow<BankAccount> {
val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE)
runWithTracing(span) {
return bankRepository.findAllByBalanceBetween(
requestDto.minBalance,
requestDto.maxBalance,
requestDto.pageable
)
}
}
@Transactional(readOnly = true)
override suspend fun findByBalanceAmount(requestDto: FindByBalanceRequestDto): Page<BankAccount> =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_WITH_PAGINATION)
runWithTracing(span) {
bankRepository.findByBalanceAmount(requestDto.minBalance, requestDto.maxBalance, requestDto.pageable)
.also { span.tag("pagination", it.toString()) }
}
}
companion object {
private const val CREATE_BANK_ACCOUNT = "BankAccountService.createBankAccount"
private const val GET_BANK_ACCOUNT_BY_ID = "BankAccountService.getBankAccountById"
private const val DEPOSIT_AMOUNT = "BankAccountService.depositAmount"
private const val WITHDRAW_AMOUNT = "BankAccountService.withdrawAmount"
private const val GET_ALL_BY_BALANCE = "BankAccountService.findAllByBalanceBetween"
private const val GET_ALL_BY_BALANCE_WITH_PAGINATION = "BankAccountService.findByBalanceAmount"
}
}
R2DBC is an API which provides reactive, non-blocking APIs for relational databases.
Using this, you can have your reactive APIs in Spring Boot read and write information to the database in a reactive/asynchronous way.
The BankRepository is combination of CoroutineSortingRepository from spring data and our custom BankPostgresRepository implementation.
For our custom BankPostgresRepository implementation used here R2dbcEntityTemplate and DatabaseClient.
If we want to have same pagination response like JPA provide,
we have to manually create PageImpl.
@Repository
interface BankRepository : CoroutineSortingRepository<BankAccount, UUID>, BankPostgresRepository {
fun findAllByBalanceBetween(min: BigDecimal, max: BigDecimal, pageable: Pageable): Flow<BankAccount>
}
@Repository
interface BankPostgresRepository {
suspend fun findByBalanceAmount(min: BigDecimal, max: BigDecimal, pageable: Pageable): Page<BankAccount>
}
@Repository
class BankPostgresRepositoryImpl(
private val template: R2dbcEntityTemplate,
private val databaseClient: DatabaseClient,
private val tracer: Tracer,
) : BankPostgresRepository {
override suspend fun findByBalanceAmount(min: BigDecimal, max: BigDecimal, pageable: Pageable): Page<BankAccount> =
withContext(Dispatchers.IO + tracer.asContextElement()) {
val span = tracer.startScopedSpan(GET_ALL_BY_BALANCE_AMOUNT)
val query = Query.query(Criteria.where(BALANCE).between(min, max))
runWithTracing(span) {
val accountsList = async {
template.select(query.with(pageable), BankAccount::class.java)
.asFlow()
.toList()
}
val totalCount = async {
databaseClient.sql("SELECT count(bank_account_id) as total FROM microservices.bank_accounts WHERE balance BETWEEN :min AND :max")
.bind("min", min)
.bind("max", max)
.fetch()
.one()
.awaitFirst()
}
PageImpl(accountsList.await(), pageable, totalCount.await()["total"] as Long)
.also { span.tag("pagination", it.toString()) }
.also { log.debug("pagination: $it") }
}
}
companion object {
private val log = LoggerFactory.getLogger(BankPostgresRepositoryImpl::class.java)
private const val GET_ALL_BY_BALANCE_AMOUNT = "BankPostgresRepository.findByBalanceAmount"
}
}
For errors handling gRPC starter provide us GrpcAdvice which marks a class to be checked up for exception handling methods,
@GrpcExceptionHandler marks the annotated method to be executed, in case of the specified exception being thrown,
status codes are good described here
@GrpcAdvice
class GrpcExceptionAdvice {
@GrpcExceptionHandler(RuntimeException::class)
fun handleRuntimeException(ex: RuntimeException): StatusException {
val status = Status.INTERNAL.withDescription(ex.message).withCause(ex)
return status.asException().also { log.error("status: $status") }
}
@GrpcExceptionHandler(BankAccountNotFoundException::class)
fun handleBankAccountNotFoundException(ex: BankAccountNotFoundException): StatusException {
val status = Status.INVALID_ARGUMENT.withDescription(ex.message).withCause(ex)
return status.asException().also { log.error("status: $status") }
}
@GrpcExceptionHandler(MethodArgumentNotValidException::class)
fun handleMethodArgumentNotValidException(ex: MethodArgumentNotValidException): StatusException {
val errorMap: MutableMap<String, String> = HashMap()
ex.bindingResult.fieldErrors.forEach { error -> error.defaultMessage?.let { errorMap[error.field] = it } }
val status = Status.INVALID_ARGUMENT.withDescription(errorMap.toString()).withCause(ex)
return status.asException().also { log.error("status: $status") }
}
@GrpcExceptionHandler(ConstraintViolationException::class)
fun handleConstraintViolationException(ex: ConstraintViolationException): StatusException {
val status = Status.INVALID_ARGUMENT.withDescription(ex.toString()).withCause(ex)
return status.asException().also { log.error("status: $status") }
}
companion object {
private val log = LoggerFactory.getLogger(GrpcExceptionAdvice::class.java)
}
}
Our microservice also has http controller:
@Tag(name = "BankAccount", description = "Bank Account REST Controller")
@RestController
@RequestMapping(path = ["/api/v1/bank"])
class BankAccountController(private val bankAccountService: BankAccountService) {
@PostMapping(produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "createBankAccount",
summary = "Create bew bank account",
operationId = "createBankAccount",
description = "Create new bank for account for user"
)
suspend fun createBankAccount(@Valid @RequestBody req: CreateBankAccountDto) =
withTimeout(timeOutMillis) {
ResponseEntity
.status(HttpStatus.CREATED)
.body(bankAccountService.createBankAccount(BankAccount.of(req)).toSuccessHttpResponse())
.also { log.info("created bank account: $it") }
}
@PutMapping(path = ["/deposit/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "depositBalance",
summary = "Deposit balance",
operationId = "depositBalance",
description = "Deposit given amount to the bank account balance"
)
suspend fun depositBalance(
@PathVariable("id") id: UUID,
@Valid @RequestBody depositBalanceDto: DepositBalanceDto
) = withTimeout(timeOutMillis) {
ResponseEntity.ok(bankAccountService.depositAmount(id, depositBalanceDto.amount).toSuccessHttpResponse())
.also { log.info("response: $it") }
}
@PutMapping(path = ["/withdraw/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "withdrawBalance",
summary = "Withdraw balance",
operationId = "withdrawBalance",
description = "Withdraw given amount from the bank account balance"
)
suspend fun withdrawBalance(
@PathVariable("id") id: UUID,
@Valid @RequestBody withdrawBalanceDto: WithdrawBalanceDto
) = withTimeout(timeOutMillis) {
ResponseEntity.ok(bankAccountService.depositAmount(id, withdrawBalanceDto.amount).toSuccessHttpResponse())
.also { log.info("response: $it") }
}
@GetMapping(path = ["{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "getBankAccountById",
summary = "Get bank account by id",
operationId = "getBankAccountById",
description = "Get user bank account by given id"
)
suspend fun getBankAccountById(@PathVariable(required = true) id: UUID) = withTimeout(timeOutMillis) {
ResponseEntity.ok(bankAccountService.getBankAccountById(id).toSuccessHttpResponse())
.also { log.info("success get bank account: $it") }
}
@GetMapping(path = ["all/balance"], produces = [MediaType.APPLICATION_JSON_VALUE])
@Operation(
method = "findAllAccountsByBalance",
summary = "Find all bank account with given amount range",
operationId = "findAllAccounts",
description = "Find all bank accounts for the given balance range with pagination"
)
suspend fun findAllAccountsByBalance(
@RequestParam(name = "min", defaultValue = "0") min: BigDecimal,
@RequestParam(name = "max", defaultValue = "500000000") max: BigDecimal,
@RequestParam(name = "page", defaultValue = "0") page: Int,
@RequestParam(name = "size", defaultValue = "10") size: Int,
) = withTimeout(timeOutMillis) {
ResponseEntity.ok(bankAccountService.findByBalanceAmount(FindByBalanceRequestDto(min, max, PageRequest.of(page, size))))
.also { log.info("response: $it") }
}
@GetMapping(path = ["all/balance/stream"])
@Operation(
method = "getAllByBalanceStream",
summary = "Find all bank account with given amount range returns stream",
operationId = "getAllByBalanceStream",
description = "Find all bank accounts for the given balance range"
)
fun getAllByBalanceStream(
@RequestParam(name = "min", defaultValue = "0") min: BigDecimal,
@RequestParam(name = "max", defaultValue = "500000000") max: BigDecimal,
@RequestParam(name = "page", defaultValue = "0") page: Int,
@RequestParam(name = "size", defaultValue = "10") size: Int,
): Flow<SuccessBankAccountResponse> {
return bankAccountService.findAllByBalanceBetween(FindByBalanceRequestDto(min, max, PageRequest.of(page, size)))
.map { it -> it.toSuccessHttpResponse().also { log.info("response: $it") } }
}
companion object {
private val log = LoggerFactory.getLogger(BankAccountController::class.java)
private const val timeOutMillis = 5000L
}
}
For working with gRPC available few UI clients, personally like to use BloomRPC,
another usefully tools is grpcurl and grpcui.
Next step let's deploy our microservice to k8s,
we can build a docker image in different ways, in this example using a simple multistage docker file:
FROM --platform=linux/arm64 azul/zulu-openjdk-alpine:17 as builder
ARG JAR_FILE=target/KotlinSpringGrpc-0.0.1-SNAPSHOT.jar
COPY ${JAR_FILE} application.jar
RUN java -Djarmode=layertools -jar application.jar extract
FROM azul/zulu-openjdk-alpine:17
COPY --from=builder dependencies/ ./
COPY --from=builder snapshot-dependencies/ ./
COPY --from=builder spring-boot-loader/ ./
COPY --from=builder application/ ./
ENTRYPOINT ["java", "org.springframework.boot.loader.JarLauncher", "-XX:MaxRAMPercentage=75", "-XX:+UseG1GC"]
For working with k8s like to use Helm, deployment for the microservice is simple and has deployment itself, Service, ConfigMap
and ServiceMonitor.
The last one is required because for monitoring use kube-prometheus-stack helm chart
Microservice helm chart yaml file is:
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.microservice.name }}
labels:
app: {{ .Values.microservice.name }}
spec:
replicas: {{ .Values.microservice.replicas }}
template:
metadata:
name: {{ .Values.microservice.name }}
labels:
app: {{ .Values.microservice.name }}
spec:
containers:
- name: {{ .Values.microservice.name }}
image: {{ .Values.microservice.image }}
imagePullPolicy: Always
resources:
requests:
memory: {{ .Values.microservice.resources.requests.memory }}
cpu: {{ .Values.microservice.resources.requests.cpu }}
limits:
memory: {{ .Values.microservice.resources.limits.memory }}
cpu: {{ .Values.microservice.resources.limits.cpu }}
livenessProbe:
httpGet:
port: {{ .Values.microservice.livenessProbe.httpGet.port }}
path: {{ .Values.microservice.livenessProbe.httpGet.path }}
initialDelaySeconds: {{ .Values.microservice.livenessProbe.initialDelaySeconds }}
periodSeconds: {{ .Values.microservice.livenessProbe.periodSeconds }}
readinessProbe:
httpGet:
port: {{ .Values.microservice.readinessProbe.httpGet.port }}
path: {{ .Values.microservice.readinessProbe.httpGet.path }}
initialDelaySeconds: {{ .Values.microservice.readinessProbe.initialDelaySeconds }}
periodSeconds: {{ .Values.microservice.readinessProbe.periodSeconds }}
ports:
- containerPort: {{ .Values.microservice.ports.http.containerPort }}
name: {{ .Values.microservice.ports.http.name }}
- containerPort: {{ .Values.microservice.ports.grpc.containerPort}}
name: {{ .Values.microservice.ports.grpc.name }}
env:
- name: SPRING_APPLICATION_NAME
value: microservice_k8s
- name: JAVA_OPTS
value: "-XX:+UseG1GC -XX:MaxRAMPercentage=75"
- name: SERVER_PORT
valueFrom:
configMapKeyRef:
key: server_port
name: {{ .Values.microservice.name }}-config-map
- name: GRPC_SERVER_PORT
valueFrom:
configMapKeyRef:
key: grpc_server_port
name: {{ .Values.microservice.name }}-config-map
- name: SPRING_ZIPKIN_BASE_URL
valueFrom:
configMapKeyRef:
key: zipkin_base_url
name: {{ .Values.microservice.name }}-config-map
- name: SPRING_R2DBC_URL
valueFrom:
configMapKeyRef:
key: r2dbc_url
name: {{ .Values.microservice.name }}-config-map
- name: SPRING_FLYWAY_URL
valueFrom:
configMapKeyRef:
key: flyway_url
name: {{ .Values.microservice.name }}-config-map
restartPolicy: Always
terminationGracePeriodSeconds: {{ .Values.microservice.terminationGracePeriodSeconds }}
selector:
matchLabels:
app: {{ .Values.microservice.name }}
---
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.microservice.name }}-service
labels:
app: {{ .Values.microservice.name }}
spec:
selector:
app: {{ .Values.microservice.name }}
ports:
- port: {{ .Values.microservice.service.httpPort }}
name: http
protocol: TCP
targetPort: http
- port: {{ .Values.microservice.service.grpcPort }}
name: grpc
protocol: TCP
targetPort: grpc
type: ClusterIP
---
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
release: monitoring
name: {{ .Values.microservice.name }}-service-monitor
namespace: default
spec:
selector:
matchLabels:
app: {{ .Values.microservice.name }}
endpoints:
- interval: 10s
port: http
path: /actuator/prometheus
namespaceSelector:
matchNames:
- default
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.microservice.name }}-config-map
data:
server_port: "8080"
grpc_server_port: "8000"
zipkin_base_url: zipkin:9411
r2dbc_url: "r2dbc:postgresql://postgres:5432/bank_accounts"
flyway_url: "jdbc:postgresql://postgres:5432/bank_accounts"
and Values.yaml file:
microservice:
name: kotlin-spring-microservice
image: alexanderbryksin/kotlin_spring_grpc_microservice:latest
replicas: 1
livenessProbe:
httpGet:
port: 8080
path: /actuator/health/liveness
initialDelaySeconds: 60
periodSeconds: 5
readinessProbe:
httpGet:
port: 8080
path: /actuator/health/readiness
initialDelaySeconds: 60
periodSeconds: 5
ports:
http:
name: http
containerPort: 8080
grpc:
name: grpc
containerPort: 8000
terminationGracePeriodSeconds: 20
service:
httpPort: 8080
grpcPort: 8000
resources:
requests:
memory: '6000Mi'
cpu: "3000m"
limits:
memory: '6000Mi'
cpu: "3000m"
As UI tool for working with k8s, personally like to use Lens.
More details and source code of the full project you can find GitHub repository here,
of course always in real-world projects, business logic and infrastructure code is much more complicated, and we have to implement many more necessary features.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)
Top comments (0)