Concurrency is a fundamental challenge when designing distributed systems, especially when dealing with shared resources like databases, account balances, or any critical operation that requires synchronization. In this article, we will explore how to implement a semaphore using Redis to control access to a shared resource. The focus will be on ensuring that specific operations, such as bank transfers, are processed one at a time per user, preventing multiple transfers from happening simultaneously for the same user.
What is a Semaphore?
A semaphore is a synchronization primitive used to control access to a shared resource. Think of it as a lock with multiple permits, where only a defined number of processes or threads can access the resource simultaneously. In our case, we will use a semaphore to ensure that a user can only perform one money transfer at a time. This prevents issues such as race conditions or double transfers that could lead to inconsistencies in account balances.
Why Use Redis for Semaphore?
Redis is an in-memory key-value store known for its performance and distributed capabilities. By leveraging Redis, we can build a distributed semaphore that works across multiple instances of our application. This makes Redis a perfect choice for managing access to resources like user transactions in a scalable and efficient way.
Use Case: A Money Transfer System
Let's assume you are working on a money transfer system, where a user can transfer funds to another user. To ensure no user initiates more than one transfer at the same time (e.g., multiple clicks leading to duplicate transactions), we need to introduce a semaphore.
Here’s how we will approach this:
Singleton Pattern: We will ensure that the
AccountsRepositoryInMemory
class is a singleton, meaning there's only one instance of it, regardless of how many times it is accessed. This guarantees a single source of truth for account information.Transfer Semaphore: A semaphore will control access to each user’s transfer, ensuring that no two transfers for the same user happen at the same time.
Redis Integration: We will use Redis to store the semaphore, taking advantage of Redis’s high availability and distributed nature.
Step 1: The Semaphore Class
The Semaphore
class is responsible for interacting with Redis and acquiring or releasing a lock (semaphore) for a given key, which represents a user in this case.
const Redis = require('ioredis');
class Semaphore {
constructor() {
/**
* @type {Redis.Redis}
*/
this.redis = new Redis();
}
/**
* Acquires a semaphore for the given key (userId).
*
* @param {string} key - The key for the semaphore (e.g., userId).
* @returns {Promise<boolean>} - Returns true if the semaphore was successfully acquired.
*/
async acquireSemaphore(key) {
const ttl = 5000; // Lock TTL (Time To Live) of 5 seconds.
const acquired = await this.redis.set(key, 'locked', 'PX', ttl, 'NX');
return acquired === 'OK'; // Returns true if the lock was acquired.
}
/**
* Releases the semaphore for the given key (userId).
*
* @param {string} key - The key for the semaphore.
* @returns {Promise<void>}
*/
async releaseSemaphore(key) {
await this.redis.del(key); // Release the lock.
}
}
module.exports = { Semaphore };
Step 2: Implementing Transfer Semaphore
The TransferSemaphore
class extends the basic Semaphore
class to work specifically with user IDs. It maps the userId
to a Redis key for acquiring and releasing semaphores.
const { Semaphore } = require("../../../shared/infra/cache/semaphore");
class TransferSemaphore extends Semaphore {
constructor() {
super();
}
/**
* Acquires a semaphore for a specific user during a transfer.
*
* @param {string} userId - The user initiating the transfer.
* @returns {Promise<boolean>} - Returns true if the semaphore was successfully acquired.
*/
async acquireSemaphore(userId) {
const key = `transfer_lock_${userId}`;
return super.acquireSemaphore(key);
}
/**
* Releases the semaphore for a specific user after a transfer.
*
* @param {string} userId - The user who completed the transfer.
* @returns {Promise<void>}
*/
async releaseSemaphore(userId) {
const key = `transfer_lock_${userId}`;
return super.releaseSemaphore(key);
}
}
module.exports = { TransferSemaphore };
Step 3: TransferMoneyUseCase – Business Logic with Semaphore
The TransferMoneyUseCase
class uses the TransferSemaphore
to ensure that a user cannot initiate multiple transfers simultaneously. It handles acquiring and releasing the semaphore around the transfer process.
const { AppError } = require("../../../shared/error/AppError");
const SingletonFactory = require("../../../shared/infra/factories/SingletonFactory");
const { TransferSemaphore } = require("../../infra/cache/TransferSemaphore");
const { AccountsRepositoryInMemory } = require("../../infra/repositories/in-memory/AccountsRepositoryInMemory");
/**
* @typedef {object} TransferMoneyUseCaseExecuteParams
* @property {string} senderId
* @property {string} receiverId
* @property {number} amount
*/
class TransferMoneyUseCase {
constructor() {
this.accountsRepository = SingletonFactory.getInstance(AccountsRepositoryInMemory);
this.transferSemaphore = new TransferSemaphore();
}
/**
* Executes the money transfer between two users.
*
* @param {TransferMoneyUseCaseExecuteParams} parameters
* @returns {Promise<void>}
*
*/
async execute({ senderId, receiverId, amount }) {
const foundSender = await this.accountsRepository.findAccountById(senderId);
if (!foundSender) {
throw new AppError("Sender account not found");
}
const acquiredSemaphore = await this.transferSemaphore.acquireSemaphore(senderId);
if (!acquiredSemaphore) {
throw new AppError("Transfer in progress. Please try again later");
}
await this.transferSemaphore.releaseSemaphore(senderId);
const foundReceiver = await this.accountsRepository.findAccountById(receiverId);
if (!foundReceiver) {
throw new AppError("Receiver account not found");
}
if (foundSender.balance < amount) {
throw new AppError("Insufficient funds");
}
foundSender.balance -= amount;
foundReceiver.balance += amount;
await this.accountsRepository.update(foundSender);
await this.accountsRepository.update(foundReceiver);
}
}
module.exports = TransferMoneyUseCase;
How the Semaphore Works in this Context
Acquiring the Semaphore: When a user initiates a transfer, the system checks if there’s an active semaphore (lock) for the user using the
acquireSemaphore
method. If the lock is available (i.e., no ongoing transfer for that user), the transfer proceeds.Releasing the Semaphore: After the transfer is complete (either successful or unsuccessful), the semaphore is released, allowing the user to perform a new transfer.
TTL (Time To Live): We set a TTL of 5 seconds for the semaphore. This ensures that, even if something goes wrong (e.g., a failure in releasing the semaphore), it will be automatically released by Redis after 5 seconds, preventing the user from being locked out indefinitely.
Benefits of Using Semaphore with Redis
- Concurrency Control: The semaphore ensures that only one transfer can occur per user at a time, preventing race conditions or duplicated transactions.
- Distributed and Scalable: Redis, being a distributed system, allows us to handle concurrent requests across multiple instances of our application. This is crucial in cloud-based or microservice architectures.
- Efficiency: Redis operates in-memory, making it extremely fast for managing locks, which is essential for real-time financial transactions.
Conclusion
Implementing semaphores using Redis is a powerful technique to manage concurrency in distributed systems. In our example, we demonstrated how this can be applied to a bank transfer use case, ensuring that users cannot perform more than one transfer at a time. This approach prevents potential race conditions, ensuring data integrity and providing a better user experience.
By utilizing Redis for semaphores, we also take advantage of its distributed nature, ensuring that our application can scale efficiently and handle concurrent requests in a cloud or multi-instance environment.
Top comments (0)