As the follow-up to my previous post "Making Better HTTP APIs", I wrote a simple NodeJS server which demonstrates how to implement synchronization of concurrent requests so that certain parts of business logic are not executed twice.
I used the example from the previous post, a Payment API, and wrote a simple server which follows POST/PUT pattern for resource creation but does not handle concurrent PUT requests correctly (yet). First, let's take a look at the basic implementation and, afterwards, let's extend the server to synchronize concurrent requests.
The server has two handlers POST /payments
and PUT /payments/id
.
app.post('/payments', (req, res) => {
const paymentId = nextPaymentId++;
const context = `request(post) #${nextRequestId++}`;
handle(() => createPayment(context, paymentId), res);
});
app.put('/payments/:id', (req, res) => {
const context = `request(put) #${nextRequestId++}`;
const paymentId = req.params.id;
handle(() => conductPayment(context, paymentId), res);
});
Both handlers define the context variable which includes the request ID. The context is useful for grouping log messages produced by the same request. Additionally, the POST /payments
handler generates a new payment id. After that, both handlers delegate the execution to the handle
function which invokes the correct business logic function and handles the HTTP response.
The handle
function is quite simple too. Note that error handling can be improved by using extended error classes. The implication is that the business function either returns an object to be sent to the client or throws an error:
async function handle(fn, res) {
try {
const result = await fn();
if (result) return res.status(200).json(result);
res.status(204).end();
} catch (err) {
res.status(409).json({
error: err.message,
});
}
}
Now let's examine the business logic. The createPayment
function does nothing more than storing the payment id with the indication that's an empty one. The conductPayment
is more complex than createPayment
:
async function conductPayment(context, paymentId) {
const payment = await getPayment(context, paymentId);
if (!payment) {
throw new Error('Payment does not exist');
}
if (payment.state === 'PROCESSING') {
throw new Error('Payment is in progress. Try again later.');
}
if (payment.state === 'PAID') {
return payment;
}
if (payment.state === 'EMPTY') {
await processPayment(context, paymentId);
}
throw new Error('Payment is in bad state');
}
This function retrieves the payment object first and then examines the state of the payment. If the payment is not paid and not being processed at the moment, the function invokes the processPayment
method. As it is a lengthy operation in the real world and typically involves a call to a 3rd-party service, it can take a while. I have simulated this using setTimeout
. The execution of processPayment
takes about 3 seconds.
Let's summarize what the server is capable of, at the moment:
It can handle concurrent
POST /payments
requests. Empty payments stored in the database have no external side effects, and we can clean them up later.It can handle only sequential
PUT /payments/id
requests for the same ID.
The point #2 may not be evident from the first glance, but if we examine the code for the conductPayment
function, we notice that there is a time gap between the await getPayment
and await processPayment
. Between those two calls, a concurrent request can arrive and read the same payment state. Thus, a concurrent request can start a parallel (and a duplicate) payment process.
Synchronization of concurrent HTTP requests
To avoid problems with concurrent requests we just need to make sure that no request for the same payment ID can start while there is another request in the code section between await getPayment
and await processPayment
. There are several ways to achieve it:
1) Queuing. We could ensure that instead of executing conductPayment
function immediately the server puts a message to a queue specific for the corresponding payment. Another process (a worker) would fetch the messages for a payment ID one at a time, thus, eliminating the problem of concurrent execution. This approach is a good and scalable solution with one drawback: it makes the architecture more complicated with several processes to manage and a message broker to maintain.
2) Locking. We could leverage either optimistic or pessimistic locking strategy. With pessimistic locking, we could use the database or something else, for example, Redis to ensure that no concurrent request can enter the conductPayment
while there is another request in progress. With optimistic locking, we could check if the payment state is still EMPTY while trying to change its state to PROCESSING (in an atomic way). If this fails, we could throw an Error and don't send the payment to the 3rd party.
Since it is not an article about locking or queuing, I show how the pessimistic locking strategy could look in the code.
app.put('/payments/:id', (req, res) => {
const context = `request(put) #${nextRequestId++}`;
const paymentId = req.params.id;
handleWithLock(() => conductPayment(context, paymentId), res);
});
Here, the handleWithLock
function is exactly like handle
but it ensures that only one instance of the business logic can be running at a time. This is how one could implement it:
async function handleWithLock(context, lockId, fn, res) {
try {
const lockState = await lock(context, lockId); // per paymentId
if (lockState === 'locked') throw new Error('Resource is locked.');
const result = await fn();
if (result) {
return res.status(200).json(result);
}
res.status(204).end();
} catch (err) {
res.status(409).json({
error: err.message,
});
} finally {
await unlock(context, lockId);
}
}
It is essential that the lock
function is such that only one process can acquire the lock. Also, it is vital that the lock is released if the Node process crashes (or that the lock expires after some time). In this simple example, I implemented basic in-memory locks. For the production-ready implementations which are supposed to work for a cluster of Node processes, something like PostgreSQL Advisory locks or Redlock can be used. Once the processing finishes, the lock is released using the unlock
function.
In this implementation, the handleWithLock
function throws an error if the resource is locked. Alternatively, the server could wait till the resource is free again using a Spinlock. Below you can the simple server in action.
The full server code can be found here: gist.
If I missed some of the ways to implement the synchronization or you spot a mistake in the code, please let me know and follow me on Twitter.
Originally published in my blog atΒ 60devs.
Top comments (0)