DEV Community

Alexander
Alexander

Posted on

Kotlin Clean Architecture and CQRS πŸš€πŸ‘‹πŸ‘¨β€πŸ’»

Arch

In this article we will implement the microservice using Clean Architecture and CQRS using Kotlin, Spring WebFlux with coroutines, Postgres and MongoDB, Kafka as a message broker, and Arrow-kt functional library which like the documentation says brings idiomatic functional programming to Kotlin. The source code is here at my GitHub, please like it if it was helpfully for you.

Clean Architecture is one of the more popular software design approaches, it follows the principles of Dependency Inversion, Single Responsibility, and Separation of Concerns. It consists of concentric circles representing different
layers, with the innermost layer being the most abstract and the outermost layer representing the user interface and
infrastructure. By separating the concerns of the various components and enforcing the dependency rule, it becomes much
easier to understand and modify the code. Depending on abstractions allows you to design your business logic flexibly, without having to know the implementation details. The domain layer and the application layer are the core of the Clean Architecture. These two layers together form the application core, encapsulating the most important business rules of the system. Clean Architecture is a domain-centric architectural approach that separates business logic from technical implementation details.

CQRS_img

CQRS stands for Command and QueryResponsibility Segregation, a pattern that separates reads and writes into different models, using commands to
update data, and queries to read data. Using CQRS, you should have a strict separation between the write model and the
read model. Those two models should be processed by separate objects and not be conceptually linked together. Those
objects are not physical storage structures but are, for example, command handlers and query handlers. They’re not
related to where and how the data will be stored. They’re connected to the processing behavior. Command handlers are
responsible for handling commands, mutating state, or doing other side effects. Query handlers are responsible for
returning the result of the requested query. It gives us: Scalability - allows for independent scaling of read and
write operations. Performance - By separating read and write operations, you can optimize each for performance.
Reads can be optimized for fast retrieval by using denormalized data structures, caching, and specialized read models tailored to specific query needs. Flexibility - allows us to model your read and write sides of the application differently, which provides flexibility in designing the data structures and processing logic to best suit the requirements of each operation. This flexibility can lead to a more efficient and maintainable system, especially in complex domains where the read and write requirements differ significantly. One of the common misconceptions about CQRS is that the commands and queries should be run on separate databases. This isn’t necessarily true; only that the behaviors and responsibilities for both should be separated. This can be within the code, within the structure of the database, or (if the situation calls for it), different databases.

Nothing in an inner circle can know anything at all about something in an outer circle. In particular, the name of
something declared in an outer circle must not be mentioned by the code in the inner circle. That includes functions and
classes. variables, or any other named software entity. In the real world understanding clean architecture can vary from
person to person. Since clean architecture emphasizes principles such as separation of concerns, dependency inversion, and abstraction layers, different developers may interpret and implement these principles differently based on their own experiences, knowledge, and project requirements. This article shows my personal view of one of the possible
implementations ways. Ultimately, the goal of clean architecture is to create software systems that are maintainable, scalable, and easy to understand.

Clean Architecture

The Presentation layer (named api here) is most outside layer and the entry point to our system. The most
important part of the Presentation layer is the Controllers, which define the API endpoints in our system presented to
the outside world and responsible for:

  • Handling interaction with the outside world
  • Presenting, displaying or returning responses with the data
  • Translating the outside requests data (map requests to application layer commands)
  • Works with framework-specific configuration setup
  • Works on top of the application layer

api_layer

Let's look at the full way of command requests in the microservice. First things first, it accepts REST HTTP requests, validates input, if it's secured checks credentials, etc., then maps the request to the DTO the command and calls AccountCommandService handle method. For example, let's look at creating new account and deposit balance commands methods call flow:

swagger

swagger_create_response



@Tag(name = "Accounts", description = "Account domain REST endpoints")
@RestController
@RequestMapping(path = ["/api/v1/accounts"])
class AccountController(
    private val accountCommandService: AccountCommandService,
    private val accountQueryService: AccountQueryService
) {

    @Operation(
        method = "createAccount", operationId = "createAccount", description = "Create new Account",
        responses = [
            ApiResponse(
                description = "Create new Account",
                responseCode = "201",
                content = [Content(
                    mediaType = MediaType.APPLICATION_JSON_VALUE,
                    schema = Schema(implementation = AccountId::class)
                )]
            ),
            ApiResponse(
                description = "bad request response",
                responseCode = "400",
                content = [Content(schema = Schema(implementation = ErrorHttpResponse::class))]
            )],
    )
    @PostMapping
    suspend fun createAccount(
        @Valid @RequestBody request: CreateAccountRequest
    ): ResponseEntity<out Any> = eitherScope(ctx) {
        accountCommandService.handle(request.toCommand()).bind()
    }.fold(
        ifLeft = { mapErrorToResponse(it) },
        ifRight = { ResponseEntity.status(HttpStatus.CREATED).body(it) }
    )

    @Operation(
        method = "depositBalance", operationId = "depositBalance", description = "Deposit balance",
        responses = [
            ApiResponse(
                description = "Deposit balance",
                responseCode = "200",
                content = [Content(
                    mediaType = MediaType.APPLICATION_JSON_VALUE,
                    schema = Schema(implementation = BaseResponse::class)
                )]
            ),
            ApiResponse(
                description = "bad request response",
                responseCode = "400",
                content = [Content(schema = Schema(implementation = ErrorHttpResponse::class))]
            )],
    )
    @PutMapping(path = ["/{id}/deposit"])
    suspend fun depositBalance(
        @PathVariable id: UUID,
        @Valid @RequestBody request: DepositBalanceRequest
    ): ResponseEntity<out Any> = eitherScope(ctx) {
        accountCommandService.handle(request.toCommand(AccountId(id))).bind()
    }.fold(
        ifLeft = { mapErrorToResponse(it) },
        ifRight = { okResponse(it) }
    )
}


Enter fullscreen mode Exit fullscreen mode

The Application layer contains the use cases of the application. A use case represents a specific interaction or
action that the system can perform. Each use case is implemented as a command or a query. It is part of the whole
application core like a Domain layer and is responsible for:

  • Executing the application use cases (all the actions and commands allowed to do with the system)
  • Fetch domain objects
  • Manipulating domain objects

Application layer

The application layer AccountCommandService has the business logic, which runs required business rules validations,
then applies changes to the domain aggregate, persists domain objects in the database, produces the domain events, and
persists them in the outbox table within one single transaction. The current application used some not-required small
optimization for outbox publishing, after the command service commits the transaction, we publish the event, but we
don't care if this publish fails, because of polling publisher realized as Spring scheduler anyway will process it.

Arrow greatly improves developer experience because
Kotlin doesn’t ship Either type
with the standard SDK. Either is an entity whose value can be of two different types, called left and right.
By convention, the Right is for the success case and
the Left for the error one
. It allows us to express the fact that a call might return a correct value or an error, and
differentiate between the two of them. The Left/Right naming pattern is just a convention. Either is a great way to make
the error handling in your code more explicit. Making the code more explicit reduces the amount of context that you need
to keep in your head, which in turn makes the code easier to understand.



interface AccountCommandService {

    suspend fun handle(command: CreateAccountCommand): Either<AppError, AccountId>

    suspend fun handle(command: ChangeAccountStatusCommand): Either<AppError, Unit>

    suspend fun handle(command: ChangeContactInfoCommand): Either<AppError, Unit>

    suspend fun handle(command: DepositBalanceCommand): Either<AppError, Unit>

    suspend fun handle(command: WithdrawBalanceCommand): Either<AppError, Unit>

    suspend fun handle(command: UpdatePersonalInfoCommand): Either<AppError, Unit>
}

@Service
class AccountCommandServiceImpl(
    private val accountRepository: AccountRepository,
    private val outboxRepository: OutboxRepository,
    private val tx: TransactionalOperator,
    private val eventPublisher: EventPublisher,
    private val serializer: Serializer,
    private val emailVerifierClient: EmailVerifierClient,
    private val paymentClient: PaymentClient
) : AccountCommandService {

    override suspend fun handle(command: CreateAccountCommand): Either<AppError, AccountId> = eitherScope(ctx) {
        emailVerifierClient.verifyEmail(command.contactInfo.email).bind()

        val (account, event) = tx.executeAndAwait {
            val account = accountRepository.save(command.toAccount()).bind()
            val event = outboxRepository.insert(account.toAccountCreatedOutboxEvent(serializer)).bind()
            account to event
        }

        publisherScope.launch { publishOutboxEvent(event) }
        account.accountId
    }

    override suspend fun handle(command: DepositBalanceCommand): Either<AppError, Unit> = eitherScope(ctx) {
        paymentClient.verifyPaymentTransaction(command.accountId.string(), command.transactionId).bind()

        val event = tx.executeAndAwait {
            val foundAccount = accountRepository.getById(command.accountId).bind()
            foundAccount.depositBalance(command.balance).bind()

            val account = accountRepository.update(foundAccount).bind()
            val event = account.toBalanceDepositedOutboxEvent(command.balance, serializer)
            outboxRepository.insert(event).bind()
        }

        publisherScope.launch { publishOutboxEvent(event) }
    }
}


Enter fullscreen mode Exit fullscreen mode

Domain layer encapsulates the most important business rules of the system, it is the place where we have to start
building core business rules, at the Domain-centric architecture, we start developing from the domain.The
responsibilities of the domain layer:

  • Defining domain models
  • Defining rules, domain and business errors
  • Executing the application business logic
  • Enforcing the business rules

Domain layer

Domain models have data and behavior and represent the domain.
We have two approaches for designing - rich and anemic domain models.
Anemic models allow external manipulation of our data, and it's usually antipattern because
the domain object itself doesn't control its own data. Rich domain models contain both data and behavior, then
richer behavior then richer domain model. It exposes only a specific set of public methods, which allows manipulation of
data only in the way the domain approves, encapsulates logic, and does validations. Rich domain model
properties are read-only by default.

Domain models can be always valid or not, better prefer always valid domain models,
at any point of time when we're working with domain state, we know it's valid and don't need to write additional
validations to check it, always valid domain models mean all the time in the valid state.
And one more important detail is Persistence ignorance -
modeling the domain without taking into account how domain objects will be persisted.



class Account(
    val accountId: AccountId = AccountId(),
) {
    var contactInfo: ContactInfo = ContactInfo()
        private set
    var personalInfo: PersonalInfo = PersonalInfo()
        private set
    var address: Address = Address()
        private set
    var balance: Balance = Balance()
        private set
    var status: AccountStatus = AccountStatus.FREE
        private set

    var version: Long = 0
        private set
    var updatedAt: Instant? = null
        private set
    var createdAt: Instant? = null
        private set

    fun depositBalance(newBalance: Balance): Either<AppError, Account> = either {
        if (balance.balanceCurrency != newBalance.balanceCurrency) raise(InvalidBalanceCurrency("invalid currency: $newBalance"))
        if (newBalance.amount < 0) raise(InvalidBalanceAmount("invalid balance amount: $newBalance"))

        balance = balance.copy(amount = (balance.amount + newBalance.amount))
        updatedAt = Instant.now()

        this@Account
    }

    fun withdrawBalance(newBalance: Balance): Either<AppError, Account> = either {
        if (balance.balanceCurrency != newBalance.balanceCurrency) raise(InvalidBalanceCurrency("invalid currency: $newBalance"))
        if (newBalance.amount < 0) raise(InvalidBalanceAmount("invalid balance amount: $newBalance"))

        val newAmount = (balance.amount - newBalance.amount)
        if ((newAmount) < 0) raise(InvalidBalanceError("invalid balance: $newBalance"))

        balance = balance.copy(amount = newAmount)
        updatedAt = Instant.now()

        this@Account
    }

    fun updateStatus(newStatus: AccountStatus): Either<AppError, Account> = either {
        status = newStatus
        updatedAt = Instant.now()
        this@Account
    }

    fun changeContactInfo(newContactInfo: ContactInfo): Either<AppError, Account> = either {
        contactInfo = newContactInfo
        updatedAt = Instant.now()
        this@Account
    }

    fun changeAddress(newAddress: Address): Either<AppError, Account> = either {
        address = newAddress
        updatedAt = Instant.now()
        this@Account
    }

    fun changePersonalInfo(newPersonalInfo: PersonalInfo): Either<AppError, Account> = either {
        personalInfo = newPersonalInfo
        updatedAt = Instant.now()
        this@Account
    }

    fun incVersion(amount: Long = 1): Either<AppError, Account> = either {
        if (amount < 1) raise(InvalidVersion("invalid version: $amount"))
        version += amount
        updatedAt = Instant.now()
        this@Account
    }

    fun withVersion(amount: Long = 1): Account {
        version = amount
        updatedAt = Instant.now()
        return this
    }

    fun decVersion(amount: Long = 1): Either<AppError, Account> = either {
        if (amount < 1) raise(InvalidVersion("invalid version: $amount"))
        version -= amount
        updatedAt = Instant.now()
        this@Account
    }

    fun withUpdatedAt(newValue: Instant): Account {
        updatedAt = newValue
        return this
    }
}


Enter fullscreen mode Exit fullscreen mode

The next is the Infrastructure layer which contains implementations for external-facing services and is responsible
for:

  • Interacting with persistence solution
  • Interacting with other services (http clients, message brokers, etc.)
  • Has actual implementations of the interfaces from the application layer
  • Identity concerns

Infrastructure layer

At the Infrastructure layer we have implementations of the application layer interfaces. As the main write database
used PostgreSQL with r2dbc reactive driver, and *DatabaseClient
*
with raw SQL queries, but if we want to use ORM entity, anyway we still pass domain objects through the other layers
interfaces, and then inside the repository implementation code map to the ORM entities. For this project keep spring
annotations as it is, but if we want cleaner implementation it's possible to move them to another layer.
In this example project sql schema is simplified and not normalized.



interface AccountRepository {

    suspend fun getById(id: AccountId): Either<AppError, Account>

    suspend fun save(account: Account): Either<AppError, Account>

    suspend fun update(account: Account): Either<AppError, Account>
}

@Repository
class AccountRepositoryImpl(
    private val dbClient: DatabaseClient
) : AccountRepository {

    override suspend fun save(account: Account): Either<AppError, Account> = eitherScope<AppError, Account>(ctx) {
        dbClient.sql(INSERT_ACCOUNT_QUERY.trimMargin())
            .bindValues(account.withVersion(FIRST_VERSION).toPostgresEntityMap())
            .fetch()
            .rowsUpdated()
            .awaitSingle()

        account
    }

    override suspend fun update(account: Account): Either<AppError, Account> = eitherScope(ctx) {
        dbClient.sql(OPTIMISTIC_UPDATE_QUERY.trimMargin())
            .bindValues(account.withUpdatedAt(Instant.now()).toPostgresEntityMap(withOptimisticLock = true))
            .fetch()
            .rowsUpdated()
            .awaitSingle()

        account.incVersion().bind()
    }

    override suspend fun getById(id: AccountId): Either<AppError, Account> = eitherScope(ctx) {
        dbClient.sql(GET_ACCOUNT_BY_ID_QUERY.trimMargin())
            .bind(ID_FIELD, id.id)
            .map { row, _ -> row.toAccount() }
            .awaitSingleOrNull()
            ?: raise(AccountNotFoundError("account for id: $id not found"))
    }
}


Enter fullscreen mode Exit fullscreen mode

pg_schema

Important detail about outbox repository realisation:

Select_for_update

The outbox repository, important detail here is to be able to handle the case of multiple pod instances processing in
parallel outbox table, of course, we have idempotent consumers, but as we can, we have to avoid processing the same
table events more than one time, to prevent multiple instances select and publish the same events,
we use here FOR UPDATE SKIP LOCKED - this combination does the next thing,
when one instance tries to select a batch of outbox events if
some other instance already selected these records, first, one will skip locked records and select the next available
and not locked, and so on.
But again it's only my personal preferred way of implementation, use of only polling publisher is usually default one.
as alternative possible to use debezium for example, but it's up to you.



interface OutboxRepository {

    suspend fun insert(event: OutboxEvent): Either<AppError, OutboxEvent>

    suspend fun deleteWithLock(
        event: OutboxEvent,
        callback: suspend (event: OutboxEvent) -> Either<AppError, Unit>
    ): Either<AppError, OutboxEvent>

    suspend fun deleteEventsWithLock(
        batchSize: Int,
        callback: suspend (event: OutboxEvent) -> Either<AppError, Unit>
    ): Either<AppError, Unit>
}

@Component
class OutboxRepositoryImpl(
    private val dbClient: DatabaseClient,
    private val tx: TransactionalOperator
) : OutboxRepository {

    override suspend fun insert(event: OutboxEvent): Either<AppError, OutboxEvent> = eitherScope(ctx) {
        dbClient.sql(INSERT_OUTBOX_EVENT_QUERY.trimMargin())
            .bindValues(event.toPostgresValuesMap())
            .map { row, _ -> row.get(ROW_EVENT_ID, String::class.java) }
            .one()
            .awaitSingle()
            .let { event }
    }


    override suspend fun deleteWithLock(
        event: OutboxEvent,
        callback: suspend (event: OutboxEvent) -> Either<AppError, Unit>
    ): Either<AppError, OutboxEvent> = eitherScope {
        tx.executeAndAwait {
            dbClient.sql(GET_OUTBOX_EVENT_BY_ID_FOR_UPDATE_SKIP_LOCKED_QUERY.trimMargin())
                .bindValues(mutableMapOf(EVENT_ID to event.eventId))
                .map { row, _ -> row.get(ROW_EVENT_ID, String::class.java) }
                .one()
                .awaitSingleOrNull()

            callback(event).bind()
            deleteOutboxEvent(event).bind()
            event
        }
    }


    override suspend fun deleteEventsWithLock(
        batchSize: Int,
        callback: suspend (event: OutboxEvent) -> Either<AppError, Unit>
    ): Either<AppError, Unit> = eitherScope(ctx) {
        tx.executeAndAwait {
            dbClient.sql(GET_OUTBOX_EVENTS_FOR_UPDATE_SKIP_LOCKED_QUERY.trimMargin())
                .bind(LIMIT, batchSize)
                .map { row, _ -> row.toOutboxEvent() }
                .all()
                .asFlow()
                .onStart { log.info { "start publishing outbox events batch: $batchSize" } }
                .onEach { callback(it).bind() }
                .onEach { event -> deleteOutboxEvent(event).bind() }
                .onCompletion { log.info { "completed publishing outbox events batch: $batchSize" } }
                .collect()
        }
    }

    private suspend fun deleteOutboxEvent(event: OutboxEvent): Either<AppError, Long> = eitherScope(ctx) {
        dbClient.sql(DELETE_OUTBOX_EVENT_BY_ID_QUERY)
            .bindValues(mutableMapOf(EVENT_ID to event.eventId))
            .fetch()
            .rowsUpdated()
            .awaitSingle()
    }
}


Enter fullscreen mode Exit fullscreen mode

The polling publisher implementation is a scheduled
process that does the same job for publishing and deleting events at the given interval,
as typed earlier and uses the same service method:



@Component
@ConditionalOnProperty(prefix = "schedulers", value = ["outbox.enable"], havingValue = "true")
class OutboxScheduler(
    private val outboxRepository: OutboxRepository,
    private val publisher: EventPublisher,
) {

    @Value("\${schedulers.outbox.batchSize}")
    private var batchSize: Int = 30

    @Scheduled(
        initialDelayString = "\${schedulers.outbox.initialDelayMillis}",
        fixedRateString = "\${schedulers.outbox.fixedRate}"
    )
    fun publishOutboxEvents() = runBlocking {
        eitherScope {
            outboxRepository.deleteEventsWithLock(batchSize) { publisher.publish(it) }.bind()
        }.fold(
            ifLeft = { err -> log.error { "error while publishing scheduler outbox events: $err" } },
            ifRight = { log.info { "outbox scheduler published events" } }
        )
    }
}


Enter fullscreen mode Exit fullscreen mode

kafka_message

Domain event is something interesting from a business point of view, that happened within the system, something that
already occurred. We're capturing the fact something happened with the system. After events have been published from the
outbox table to the broker, in this application, it consumes them from the Kafka, and the consumers themselves call
EventHandlerService methods, which builds a read model for out domain aggregates.

The read model of a CQRS-based system provides materialized views of the data, typically as highly denormalized
views. These views are tailored to the interfaces and display requirements of the application, which helps to maximize
both display and query performance. For error handling and retry, messages prefer to use separate retry topics and
listeners. Using the stream of events as the write store, rather than the actual data at a point in time, avoids update
conflicts on a single aggregate and maximizes performance and scalability. The events can be used to asynchronously
generate materialized views of the data that are used to populate the read store. As with any system where the write and
read stores are separate, systems based on this pattern are only eventually consistent. There will be some delay between
the event being generated and the data store being updated. Here is Kafka consumer implementation:



@Component
class BalanceDepositedEventConsumer(
    private val eventProcessor: EventProcessor,
    private val kafkaTopics: KafkaTopics
) {


    @KafkaListener(
        groupId = "\${kafka.consumer-group-id:account_microservice_group_id}",
        topics = ["\${topics.accountBalanceDeposited.name}"],
    )
    fun process(ack: Acknowledgment, record: ConsumerRecord<String, ByteArray>) = eventProcessor.process(
        ack = ack,
        consumerRecord = record,
        deserializationClazz = BalanceDepositedEvent::class.java,
        onError = eventProcessor.errorRetryHandler(kafkaTopics.accountBalanceDepositedRetry.name, DEFAULT_RETRY_COUNT)
    ) { event ->
        eventProcessor.on(
            ack = ack,
            consumerRecord = record,
            event = event,
            retryTopic = kafkaTopics.accountBalanceDepositedRetry.name
        )
    }

    @KafkaListener(
        groupId = "\${kafka.consumer-group-id:account_microservice_group_id}",
        topics = ["\${topics.accountBalanceDepositedRetry.name}"],
    )
    fun processRetry(ack: Acknowledgment, record: ConsumerRecord<String, ByteArray>) = eventProcessor.process(
        ack = ack,
        consumerRecord = record,
        deserializationClazz = BalanceDepositedEvent::class.java,
        onError = eventProcessor.errorRetryHandler(kafkaTopics.accountBalanceDepositedRetry.name, DEFAULT_RETRY_COUNT)
    ) { event ->
        eventProcessor.on(
            ack = ack,
            consumerRecord = record,
            event = event,
            retryTopic = kafkaTopics.accountBalanceDepositedRetry.name
        )
    }
}


Enter fullscreen mode Exit fullscreen mode

At the application layer AccountEventsHandlerService is implemented in the following way:



interface AccountEventHandlerService {

    suspend fun on(event: AccountCreatedEvent): Either<AppError, Unit>

    suspend fun on(event: BalanceDepositedEvent): Either<AppError, Unit>

    suspend fun on(event: BalanceWithdrawEvent): Either<AppError, Unit>

    suspend fun on(event: PersonalInfoUpdatedEvent): Either<AppError, Unit>

    suspend fun on(event: ContactInfoChangedEvent): Either<AppError, Unit>

    suspend fun on(event: AccountStatusChangedEvent): Either<AppError, Unit>
}

@Component
class AccountEventHandlerServiceImpl(
    private val accountProjectionRepository: AccountProjectionRepository
) : AccountEventHandlerService {

    override suspend fun on(event: AccountCreatedEvent): Either<AppError, Unit> = eitherScope(ctx) {
        accountProjectionRepository.save(event.toAccount()).bind()
    }

    override suspend fun on(event: BalanceDepositedEvent): Either<AppError, Unit> = eitherScope(ctx) {
        findAndUpdateAccountById(event.accountId, event.version) { account ->
            account.depositBalance(event.balance).bind()
        }.bind()
    }

    private suspend fun findAndUpdateAccountById(
        accountId: AccountId,
        eventVersion: Long,
        block: suspend (Account) -> Account
    ): Either<AppError, Account> = eitherScope(ctx) {
        val foundAccount = findAndValidateVersion(accountId, eventVersion).bind()
        val accountForUpdate = block(foundAccount)
        accountProjectionRepository.update(accountForUpdate).bind()
    }

    private suspend fun findAndValidateVersion(
        accountId: AccountId,
        eventVersion: Long
    ): Either<AppError, Account> = eitherScope(ctx) {
        val foundAccount = accountProjectionRepository.getById(accountId).bind()
        validateVersion(foundAccount, eventVersion).bind()
        foundAccount
    }
}


Enter fullscreen mode Exit fullscreen mode

The infrastructure layer read model repository uses MongoDB Kotlin coroutines driver:

mongo_schema



interface AccountProjectionRepository {

    suspend fun save(account: Account): Either<AppError, Account>

    suspend fun update(account: Account): Either<AppError, Account>

    suspend fun getById(id: AccountId): Either<AppError, Account>

    suspend fun getByEmail(email: String): Either<AppError, Account>

    suspend fun getAll(page: Int, size: Int): Either<AppError, AccountsList>

    suspend fun upsert(account: Account): Either<AppError, Account>
}


Enter fullscreen mode Exit fullscreen mode


@Component
class AccountProjectionRepositoryImpl(
    mongoClient: MongoClient,
) : AccountProjectionRepository {

    private val accountsDB = mongoClient.getDatabase(ACCOUNTS_DB)
    private val accountsCollection = accountsDB.getCollection<AccountDocument>(ACCOUNTS_COLLECTION)

    override suspend fun save(account: Account): Either<AppError, Account> = eitherScope<AppError, Account>(ctx) {
        val insertResult = accountsCollection.insertOne(account.toDocument())
        log.info { "account insertOneResult: $insertResult, account: $account" }
        account
    }

    override suspend fun update(account: Account): Either<AppError, Account> = eitherScope(ctx) {
        val filter = and(eq(ACCOUNT_ID, account.accountId.string()), eq(VERSION, account.version))
        val options = FindOneAndUpdateOptions().upsert(false).returnDocument(ReturnDocument.AFTER)

        accountsCollection.findOneAndUpdate(
            filter,
            account.incVersion().bind().toBsonUpdate(),
            options
        )
            ?.toAccount()
            ?: raise(AccountNotFoundError("account with id: ${account.accountId} not found"))
    }

    override suspend fun upsert(account: Account): Either<AppError, Account> = eitherScope(ctx) {
        val filter = and(eq(ACCOUNT_ID, account.accountId.string()))
        val options = FindOneAndUpdateOptions().upsert(true).returnDocument(ReturnDocument.AFTER)

        accountsCollection.findOneAndUpdate(
            filter,
            account.toBsonUpdate(),
            options
        )
            ?.toAccount()
            ?: raise(AccountNotFoundError("account with id: ${account.accountId} not found"))
    }

    override suspend fun getById(id: AccountId): Either<AppError, Account> = eitherScope(ctx) {
        accountsCollection.find<AccountDocument>(eq(ACCOUNT_ID, id.string()))
            .firstOrNull()
            ?.toAccount()
            ?: raise(AccountNotFoundError("account with id: $id not found"))
    }

    override suspend fun getByEmail(email: String): Either<AppError, Account> = eitherScope(ctx) {
        val filter = and(eq(CONTACT_INFO_EMAIL, email))
        accountsCollection.find(filter).firstOrNull()?.toAccount()
            ?: raise(AccountNotFoundError("account with email: $email not found"))
    }

    override suspend fun getAll(
        page: Int,
        size: Int
    ): Either<AppError, AccountsList> = eitherScope<AppError, AccountsList>(ctx) {
        parZip(coroutineContext, {
            accountsCollection.find()
                .skip(page * size)
                .limit(size)
                .map { it.toAccount() }
                .toList()
        }, {
            accountsCollection.find().count()
        }) { list, totalCount ->
            AccountsList(
                page = page,
                size = size,
                totalCount = totalCount,
                accountsList = list
            )
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

swagger_get_by_id_response

And read queries way through the layers is very similar, we accept HTTP requests at the api layer:



@Tag(name = "Accounts", description = "Account domain REST endpoints")
@RestController
@RequestMapping(path = ["/api/v1/accounts"])
class AccountController(
    private val accountCommandService: AccountCommandService,
    private val accountQueryService: AccountQueryService
) {
    @Operation(
        method = "getAccountByEmail", operationId = "getAccountByEmail", description = "Get account by email",
        responses = [
            ApiResponse(
                description = "Get account by email",
                responseCode = "200",
                content = [Content(
                    mediaType = MediaType.APPLICATION_JSON_VALUE,
                    schema = Schema(implementation = AccountResponse::class)
                )]
            ),
            ApiResponse(
                description = "bad request response",
                responseCode = "400",
                content = [Content(schema = Schema(implementation = ErrorHttpResponse::class))]
            )],
    )
    @GetMapping(path = ["/email/{email}"])
    suspend fun getAccountByEmail(
        @PathVariable @Email @Size(
            min = 6,
            max = 255
        ) email: String
    ): ResponseEntity<out Any> = eitherScope(ctx) {
        accountQueryService.handle(GetAccountByEmailQuery(email)).bind()
    }.fold(
        ifLeft = { mapErrorToResponse(it) },
        ifRight = { ResponseEntity.ok(it.toResponse()) }
    )
}


Enter fullscreen mode Exit fullscreen mode

the application layer AccountQueryService methods:



interface AccountQueryService {

    suspend fun handle(query: GetAccountByIdQuery): Either<AppError, Account>

    suspend fun handle(query: GetAccountByEmailQuery): Either<AppError, Account>

    suspend fun handle(query: GetAllAccountsQuery): Either<AppError, AccountsList>
}


Enter fullscreen mode Exit fullscreen mode


@Service
class AccountQueryServiceImpl(
    private val accountRepository: AccountRepository,
    private val accountProjectionRepository: AccountProjectionRepository
) : AccountQueryService {

    override suspend fun handle(query: GetAccountByIdQuery): Either<AppError, Account> = eitherScope(ctx) {
        accountRepository.getById(query.id).bind()
    }

    override suspend fun handle(query: GetAccountByEmailQuery): Either<AppError, Account> = eitherScope(ctx) {
        accountProjectionRepository.getByEmail(query.email).bind()
    }

    override suspend fun handle(query: GetAllAccountsQuery): Either<AppError, AccountsList> = eitherScope(ctx) {
        accountProjectionRepository.getAll(page = query.page, size = query.size).bind()
    }
}


Enter fullscreen mode Exit fullscreen mode

and it uses PostgreSQL or MongoDB repositories to get the data depending on the query use case:



@Component
class AccountProjectionRepositoryImpl(
    mongoClient: MongoClient,
) : AccountProjectionRepository {

    private val accountsDB = mongoClient.getDatabase(ACCOUNTS_DB)
    private val accountsCollection = accountsDB.getCollection<AccountDocument>(ACCOUNTS_COLLECTION)

    override suspend fun getByEmail(email: String): Either<AppError, Account> = eitherScope(ctx) {
        val filter = and(eq(CONTACT_INFO_EMAIL, email))
        accountsCollection.find(filter)
            .firstOrNull()
            ?.toAccount()
            ?: raise(AccountNotFoundError("account with email: $email not found"))
    }
}


Enter fullscreen mode Exit fullscreen mode

In real-world applications, we have to implement many more necessary features, like k8s health checks, circuit breakers,
rate limiters, etc., this project is simplified for demonstration example, purposes.
Source code is in my GitHub, please star it if helpful and useful for you.
I hope this article is useful and helpful, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

Top comments (0)