Hello guys, in the previous tutorial, we created a simple program that consisted of DonutStorage, and a Consumer who consumed donuts. We had multiple consumers run by different threads at the same time. We had to synchronize them to solve a race condition.
But what if we also want to include producers? We’ll first need to change the app to follow the best OOP practices. Second, we’ll get a producer-consumer problem, find out what it is, and solve it with wait
and notify
methods. Then we’ll learn how it can be easily done with BlockingQueue
. Let us begin!
Table of Contents
1. Adding producers
2. Producer-consumer problem
3. Wait and notify
1) On what object to call the methods
2) What’s an InterruptedException
3) What happens when a thread meets the wait method
4) What happens when the thread gets notified
5) Why use notifyAll over notify
6) The importance of keeping wait within a while loop
7) When should you notify?
4. BlockingQueue
5. BlockingQueue vs wait and notify
6. Further reading
Adding producers
Code recap from the previous tutorial:
public class DonutStorage {
private int donutsNumber;
public DonutStorage(int donutsNumber) { this.donutsNumber = donutsNumber; }
public int getDonutsNumber() { return donutsNumber; }
public void setDonutsNumber(int donutsNumber) { this.donutsNumber = donutsNumber; }
}
public class Consumer {
private final DonutStorage donutStorage;
public Consumer(DonutStorage donutStorage) {
this.donutStorage = donutStorage;
}
/**
* Subtracts the given number from the DonutStorage's donutsNumber. If the given number is bigger
* than the number of donuts in stock, sets the donutsNumber to 0.
* @param numberOfItemsToConsume Number that will be subtracted from the donutsNumber
* @return the number of consumed items
*/
public int consume(int numberOfItemsToConsume) {
synchronized (donutStorage) {
int donutsNumber = donutStorage.getDonutsNumber();
// if there aren't enough donuts in stock, consume as many as there are
if (numberOfItemsToConsume > donutsNumber) {
donutStorage.setDonutsNumber(0);
return donutsNumber;
}
donutStorage.setDonutsNumber(donutsNumber - numberOfItemsToConsume);
return numberOfItemsToConsume;
}
}
}
public class Main {
public static void main(String[] args) {
int consumersNumber = 10;
DonutStorage donutStorage = new DonutStorage(20);
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<?>> futures = new ArrayList<>(consumersNumber);
for (int i = 0; i < consumersNumber; i++) {
futures.add(executor.submit(() -> {
Consumer consumer = new Consumer(donutStorage);
System.out.println(Thread.currentThread().getName() + " consumed " +
consumer.consume(3));
}));
}
executor.shutdown();
// make the main thread wait for others to finish
for (Future<?> future: futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println("Exception while getting from future" + e.getMessage());
e.printStackTrace();
}
}
System.out.println("Number of remaining donuts: " + donutStorage.getDonutsNumber());
}
}
So we have the DonutStorage
with a single variable donutsNumber
, getters and setters for it, and a constructor. We also have the Consumer
class, which has its instance of DonutStorage
and a single method for consuming items. In the Main
class, we create a DonutStorage
with the initial donutsNumber
equal to 20. Then, we create 10 separate threads. Each creates a new Consumer
and consumes 3 items. In addition, we want to print how many donuts are left before the program exits, but to make this work, we need to create a list of futures and loop through them. This way, the main thread will wait until the others finish.
To add producers, let’s first figure out what they are going to do. They will simply add the number specified in the param to the donutsNumber
. But we also want some protection because the storage probably has some limits. So let’s also add a donutsCapacity
field to the DonutStorage
. And before updating the donutsNumber
, we’ll check if there is enough space to add this number of donuts. If not, we’ll add as many as possible and return the number of added donuts. Here is the code:
public int produce(int numberOfItems) {
int donutsNumber = donutStorage.getDonutsNumber();
int donutsCapacity = donutStorage.getDonutsCapacity();
// Represents a number of donuts the client can put before the storage
// capacity is reached.
int availableSpace = donutsCapacity - donutsNumber;
if (numberOfItems > availableSpace) {
donutStorage.setDonutsNumber(donutsCapacity);
return availableSpace;
}
donutStorage.setDonutsNumber(donutsNumber + numberOfItems);
return numberOfItems;
}
But it wouldn’t be a good decision to create a separate class called Producer
. That’s because the Producer
and Consumer
classes would be almost identical. They would both have a DonutStorage
instance and a single method with the same argument. The only thing that would be different is the operation they would do, which would be defined in the body of that method. In addition, the operation should be done synchronously, which would be difficult to realize using a superclass and its subclasses.
That’s why I propose creating one class and passing the operation it’s meant to do in the constructor. To achieve this in Java, we’ll need to create a FunctionalInterface
and pass its instance in the constructor (ala functional programming):
/**
* Client is meant to be a producer or consumer. Each client has access to the
* DonutStorage does the respective action with donuts.
*/
public class Client {
private final ClientOperation clientOperation;
private final DonutStorage donutStorage;
public Client(ClientOperation clientOperation, DonutStorage donutStorage) {
this.clientOperation = clientOperation;
this.donutStorage = donutStorage;
}
/**
*
*@paramnumberOfItems number that represents how to change the donutsNumber
*@return number that represents how the donutsNumber was changed
*/
public int operate(int numberOfItems) {
synchronized (donutStorage) {
return clientOperation.operate(donutStorage, numberOfItems);
}
}
@FunctionalInterface
public interface ClientOperation {
int operate(DonutStorage donutStorage, int numberOfItems);
}
}
This way, when someone creates a Client
, they will have to specify the operation they want the Client
to do. And this also guarantees that the operation will be done synchronously because of the synchronized block within the operate
method.
For now, we plan the Client
to be either a producer or a consumer, whereas other classes are allowed to define their own functionality of the Client
. To avoid writing boilerplate code and possibly some bugs in the future, let’s define this functionality right inside the Client
class. We can achieve this using public static final
variables:
public static final ClientOperation consume = (donutStorage, numberOfItems) -> {
int donutsNumber = donutStorage.getDonutsNumber();
// if there aren't enough donuts in stock, consume as many as there are
if (numberOfItems > donutsNumber) {
donutStorage.setDonutsNumber(0);
return donutsNumber;
}
donutStorage.setDonutsNumber(donutsNumber - numberOfItems);
return numberOfItems;
};
public static final ClientOperation produce = (donutStorage, numberOfItems) -> {
int donutsNumber = donutStorage.getDonutsNumber();
int donutsCapacity = donutStorage.getDonutsCapacity();
// Represents a number of donuts the client can put before the storage
// capacity is reached.
int availableSpace = donutsCapacity - donutsNumber;
if (numberOfItems > availableSpace) {
donutStorage.setDonutsNumber(donutsCapacity);
return availableSpace;
}
donutStorage.setDonutsNumber(donutsNumber + numberOfItems);
return numberOfItems;
};
Here lambdas shorten the code of creating a new instance of ClientOperation
and overriding the operate
method.
And finally, let’s make the ClientOperation
private. This way, we’ll restrict defining a custom functionality to the Client
. It will either produce or consume items.
Now we don’t need the Consumer
class anymore. The only thing left is to make minor changes to the Main
class:
You can compare the old and the updated versions here.
Client consumer = new Client(Client.consume, donutStorage);
System.out.println(Thread.currentThread().getName() + " consumed " +
consumer.operate(3));
Producer-consumer problem
Now let’s test our program. Let’s create 10 consumers, each consuming 5 items, and 5 producers, each producing 6 items.
int consumersNumber = 10;
int producersNumber = 5;
DonutStorage donutStorage = new DonutStorage(20);
ExecutorService executor = Executors.newFixedThreadPool(consumersNumber+producersNumber);
List<Future<?>> futures = new ArrayList<>(consumersNumber);
for (int i = 0; i < consumersNumber; i++) {
futures.add(executor.submit(() -> {
Client consumer = new Client(Client.consume, donutStorage);
System.out.println(Thread.currentThread().getName() + " consumed " +
consumer.operate(5));
}));
}
for (int i = 0; i < producersNumber; i++) {
futures.add(executor.submit(() -> {
Client producer = new Client(Client.produce, donutStorage);
System.out.println(Thread.currentThread().getName() + " produced " +
producer.operate(6));
}));
}
The initial number of donuts in the storage is 20. So we expect the remaining number of items to be 0. But in reality, I get the resulting number equal to 30. Here is the full output:
pool-1-thread-2 consumed 0
pool-1-thread-3 consumed 5
pool-1-thread-1 consumed 5
pool-1-thread-7 consumed 5
pool-1-thread-5 consumed 0
pool-1-thread-1 produced 6
pool-1-thread-7 produced 6
pool-1-thread-8 consumed 0
pool-1-thread-6 consumed 5
pool-1-thread-7 produced 6
pool-1-thread-1 produced 6
pool-1-thread-5 produced 6
pool-1-thread-3 consumed 0
pool-1-thread-2 consumed 0
pool-1-thread-4 consumed 0
Number of remaining donuts: 30
What happened is all the consumers finished their work (consumed 20 items) before the producers came and added 30 extra donuts. And these events weren’t printed in the order because of the thread interleaving.
Wait and notify
The problem can be solved with wait
and notify
. They are methods of the Object
class that block the current thread until it gets notified. So we could just call wait
on the consumer threads when there are not enough items in the stock, and notify
them when we are sure that the stock is not empty anymore. Similarly, we could block the producers when the storage is full and notify them, when a consumer takes an item.
So is that how the updated consumer part would look like?
public static final ClientOperation consume= (donutStorage, numberOfItems) -> {
int donutsNumber = donutStorage.getDonutsNumber();
// if there aren't enough donuts in stock, consume as many as there are
if (numberOfItems > donutsNumber) {
donutStorage.setDonutsNumber(0);
wait(); // wait until producers add enough items
}
donutStorage.setDonutsNumber(donutsNumber - numberOfItems);
// notify producers that it's not full anymore as we've just took some items
notify();
return numberOfItems;
};
Unfortunately, it’s not that easy. We must consider the following:
1. On what object to call the methods
The code above would end with the exceptions:
Non-static method 'wait()' cannot be referenced from a static context
Non-static method 'notify()' cannot be referenced from a static context
It also wouldn’t work if we defined separate objects that represent the fullness and emptiness of the storage. We would get an
IllegalMonitorStateException
– thrown if the current thread is not the owner of the object's monitor.
This phrase is written so clearly that we don’t understand anything. But I hope to clear it up in a moment ⬇️
If we were allowed to call wait()
and notify()
on some random object we would end up with lots of bugs. That’s because we may have another part of the program where other threads are waiting for something else to happen.
The exception means that they want us to call wait
and notify
on that object we synchronized our method on. In our case, it’s the donutStorage
. If we call it on another object, it will not work. It also won’t work if we call the methods in an unsynchronized block of code.
2. What’s an InterruptedException
We are obliged to handle this exception that may be thrown by the wait
method. It’s an exception that indicates that someone canceled the operation or the OS wants to terminate the thread. If that happens, the thread does not finish its work. It will do what is written in the try/catch block. Read more about how to handle InterruptedException
.
In our case, we will just print that the thread was interrupted and return the number of consumed/produced items.
3. What happens when a thread meets the wait
method
When the wait
method is called, the first thing the thread does is release the lock it has held. Otherwise, other threads would be waiting for that lock and will not be able to update the variable. And the program would come to a deadlock situation.
4. What happens when the thread gets notified
The thread continues its work from that line of code with the wait
method. So we should write some code to execute after that. In our case, the consumer will consume the rest of the items:
public static final ClientOperation consume= (donutStorage, numberOfItems) -> {
// if there aren't enough donuts in stock, consume as many as there are and wait for more
if (numberOfItems > donutStorage.getDonutsNumber()) {
int numberOfTakenItems = donutStorage.getDonutsNumber();
int numberOfItemsToTake = numberOfItems - numberOfTakenItems;
donutStorage.setDonutsNumber(0);
try {
donutStorage.wait(); // wait until producers add enough items
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +
" was interrupted and didn't consume the desired amount of items.");
return numberOfTakenItems;
}
donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() - numberOfItemsToTake);
} else donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() - numberOfItems);
// notify producers that it's not full anymore as we've just took some items
donutStorage.notify();
return numberOfItems;
};
Mind you that we no longer use a local variable as a contraction for donutStorage.getDonutsNumber()
. That’s because this variable is no longer constant as the thread calling the wait()
method releases the lock and the donutsNumber
gets updated by other threads.
5. Why use notifyAll
over notify
Another method is notifyAll()
, which wakes up all the threads that are waiting. This is different from notify()
, which only wakes up one random thread from the wait set. notify()
is more efficient, but it has a risk. If the chosen thread cannot continue, it will be blocked again. If no other thread calls notify()
again, the system will deadlock.
6. The importance of keeping wait
within a while
loop
So it’s better to use notifyAll()
. This brings another problem. When one of the multiple consumers gets notified, it might consume some items, leaving the others with insufficient items.
Even though the access to the donutStorage
is synchronized, a thread may acquire a lock, and consume items. When another thread acquires that lock, it will continue from that line of code where the wait
method is and will not double-check the condition. So, for preventing that, it’s recommended to always put the wait
method in the while
loop, even when you use notify
.
7. When should you notify?
The rule of thumb is to use notify
whenever the object's state changes in a way that could benefit the threads that are waiting. For example, when the numberOfDonuts
changes, the waiting threads should have another opportunity to check it.
With all being said, here is the updated code ⬇️. You can also compare the old and the updated versions here.
public static final ClientOperation consume = (donutStorage, numberOfItems) -> {
if (numberOfItems > donutStorage.getDonutsNumber()) {
// if there aren't enough donuts in stock, consume as many as there are
int numberOfTakenItems = donutStorage.getDonutsNumber();
int numberOfItemsToTake = numberOfItems - numberOfTakenItems;
donutStorage.setDonutsNumber(0);
// but wait in case producers put some more items
while (numberOfItemsToTake > donutStorage.getDonutsNumber()) {
try {
donutStorage.wait();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +
" was interrupted and didn't consume the desired amount of items.");
return numberOfTakenItems;
}
}
donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() - numberOfItemsToTake);
} else donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() - numberOfItems);
// notify the producers that it's not full anymore as we've just took some items
donutStorage.notifyAll();
return numberOfItems;
};
public static final ClientOperation produce = (donutStorage, numberOfItems) -> {
final int donutsCapacity = donutStorage.getDonutsCapacity();
// Represents a number of donuts the client can put before the storage
// capacity is reached. We will take exactly this amount of donuts if
// there is not enough space.
int availableSpace = donutsCapacity - donutStorage.getDonutsNumber();
// Number of items the producer hasn't put yet
int numberOfItemsToPut = numberOfItems - availableSpace;
if (numberOfItems > (availableSpace)) {
donutStorage.setDonutsNumber(donutsCapacity);
while (numberOfItems > (donutsCapacity - donutStorage.getDonutsNumber())) {
try {
donutStorage.wait();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() +
" was interrupted and didn't produce the desired amount of items.");
return availableSpace;
}
}
donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() + numberOfItemsToPut);
} else donutStorage.setDonutsNumber(donutStorage.getDonutsNumber() + numberOfItems);
// notify the consumers that it's not empty anymore as we've just took some items
donutStorage.notifyAll();
return numberOfItems;
};
BlockingQueue
The code above runs well, and we’ve fixed the issue. But there is another one. When consumers want more items than producers ever put, the program will run infinitely. That’s because the consumers will wait for more producers to put some items. But all the producers may have already finished their work. The same will happen if producers want to put more items than the storage can hold, and all the consumers have left and don’t need more items.
You can check it yourself by setting the producers' number to 5, the consumers' number to 10, and let them produce and consume 10 items each. Make sure that the initial `donutsNumber' is 20. The program will run infinitely.
This can be solved by specifying the time the producers and consumers will wait before they finish. But the program has got very complicated and I’m eager to introduce a simpler and more effective solution — BlockingQueue
.
This basically does all the inter-thread communication and synchronization for us. It’s very easy to implement. BlockingQueue
stores all the items in an array. When the array is empty, it internally blocks all the consumers and notifies them when some items have been added. A similar is done with the producers.
The primary methods for putting into the queue and taking from it are put
and take
. But I use offer
and poll
to solve the problem described above. They put one item into the queue or take one from it. I also use add
to initially populate the queue with some items.
You can find the final code on my GitHub. Make sure to star the repo! ⭐
DanielRendox / ThreadSynchronizationInJava
A simple learning project that demonstrates ways to solve such multithreading problems as race conditions, deadlocks, and producer-consumer problem.
ThreadSynchronizationInJava
A simple learning project that demonstrates ways to solve such multithreading problems as race conditions, deadlocks, and producer-consumer problem.
The project forms the basis of the following tutorials:
BlockingQueue vs wait and notify
There are various tools like BlockingQueue out there that provide a simple and effective solution to the producer-consumer problem. It’s not recommended to use old-school wait and notify for this purpose as they require special effort and your code will be bug-prone as opposed to well-tested ready solutions.
However, wait and notify provide more flexibility and control over the synchronization mechanism. For example, using BlockingQueue is designed to work with custom objects. So ideally, we would create a class called Donut for that and put these donuts into the queue. But we don’t need them. What only matters in our program is the number of items not getting bigger than the capacity and not getting negative. So in our case, we waste some resources on creating an array of simple empty Objects. But even so, the pros justify the cons.
Nevertheless, if that’s not true for your app, it may be reasonable to use such tools as wait and notify. The reason why it’s the most popular solution to the producer-consumer problem is that it’s easier to understand inter-thread communication using wait and notify. You may also get asked to solve a producer-consumer problem with wait and notify on your interview.
I hope you learned how to solve a producer-consumer problem from this tutorial. Put a unicorn 🦄 on this article to indicate that you managed to read the whole post. Your and my effort should be seen! See you later!
Further reading
Check out my other articles about multithreading in Java:
🤯 Thread, Runnable, Callable, ExecutorService, and Future - all the ways to create threads in Java
Daniel Rendox ・ Jun 7 '23
🛡️ What is a Race Condition in Java, and how it can be prevented using synchronized and AtomicInteger
Daniel Rendox ・ Jun 17 '23
And some other useful resources as well:
Top comments (0)