The Producer/Consumer (Bounded Buffer) Problem
This lesson introduces the famous producer/consumer problem and presents an attempt to solve it.
We'll cover the following
The next synchronization problem we will confront in this chapter is known as the producer/consumer problem, or sometimes as the bounded buffer problem,
The problem
Imagine one or more producer threads and one or more consumer threads. Producers generate data items and place them in a buffer; consumers grab said items from the buffer and consume them in some way.
This arrangement occurs in many real systems. For example, in a multi-threaded web server, a producer puts HTTP requests into a work queue (i.e., the bounded buffer). While, the consumer threads take requests out of this queue and process them.
A bounded buffer is also used when you pipe the output of one program into another, e.g., grep foo file.txt | wc -l
. This example runs two processes concurrently; grep
writes lines from file.txt
with the string foo
in them to what it thinks is standard output; the UNIX shell redirects the output to what is called a UNIX pipe (created by the pipe system call). The other end of this pipe is connected to the standard input of the process wc
, which simply counts the number of lines in the input stream and prints out the result. Thus, the grep
process is the producer; the wc
process is the consumer; between them is an in-kernel bounded buffer; you, in this example, are just the happy user.
Because the bounded buffer is a shared resource, we must, of course, require synchronized access to it,
The first thing needed is a shared buffer, into which a producer puts data, and out of which a consumer takes data. Let’s just use a single integer for simplicity (you can certainly imagine placing a pointer to a data structure into this slot instead), and the two inner routines to put a value into the shared buffer, and to get a value out of the buffer. See the code excerpt below for details.
int buffer;int count = 0; // initially, emptyvoid put(int value) {assert(count == 0);count = 1;buffer = value;}int get() {assert(count == 1);count = 0;return buffer;}
Pretty simple, no? The put()
routine assumes the buffer is empty (and checks this with an assertion), and then simply puts a value into the shared buffer and marks it full by setting count
to 1. The get()
routine does the opposite, setting the buffer to empty (i.e., setting count
to 0) and returning the value. Don’t worry that this shared buffer has just a single entry; later, you’ll generalize it to a queue that can hold multiple entries, which will be even more fun than it sounds.
Now you need to write some routines that know when it is OK to access the buffer to either put data into it or get data out of it. The conditions for this should be obvious: only put data into the buffer when count
is zero (i.e., when the buffer is empty), and only get data from the buffer when count
is one (i.e., when the buffer is full). If you write the synchronization code such that a producer puts data into a full buffer, or a consumer gets data from an empty one, you have done something wrong (and in this code, an assertion will fire).
This work is going to be done by two types of threads, one set of which is called the producer threads, and the other set, consumer threads. The code excerpt below shows the code for a producer that puts an integer into the shared buffer loops
number of times, and a consumer that gets the data out of that shared buffer (forever), each time printing out the data item it pulled from the shared buffer.
void *producer(void *arg) {int i;int loops = (int) arg;for (i = 0; i < loops; i++) {put(i);}}void *consumer(void *arg) {while (1) {int tmp = get();printf("%d\n", tmp);}}
A broken solution
Now imagine that you have just a single producer and a single consumer. Obviously the put()
and get()
routines have critical sections within them, as put()
updates the buffer, and get()
reads from it. However, putting a lock around the code doesn’t work; you need something more.
Not surprisingly, that something more is some condition variables. In this (broken) first try (given below), we have a single condition variable cond
and associated lock mutex
.
int loops; // must initialize somewhere...cond_t cond;mutex_t mutex;void *producer(void *arg) {int i;for (i = 0; i < loops; i++){Pthread_mutex_lock(&mutex); // p1if (count == 1) // p2Pthread_cond_wait(&cond, &mutex); // p3put(i); // p4Pthread_cond_signal(&cond); // p5Pthread_mutex_unlock(&mutex); // p6}}void *consumer(void *arg) {int i;for(i = 0; i < loops; i++) {Pthread_mutex_lock(&mutex); // c1if (count == 0) // c2Pthread_cond_wait(&cond, &mutex); // c3int tmp = get(); // c4Pthread_cond_signal(&cond); // c5Pthread_mutex_unlock(&mutex); // c6printf("%d\n", tmp);}}
Explanation
Let’s examine the signaling logic between producers and consumers. When a producer wants to fill the buffer, it waits for it to be empty (p1– p3). The consumer has the exact same logic but waits for a different condition: fullness (c1–c3).
With just a single producer and a single consumer, the code given above works. However, if you have more than one of these threads (e.g., two consumers), the solution has two critical problems. What are they?
… (pause here to think) …
Let’s understand the first problem, which has to do with the if
statement before the wait. Assume there are two consumers ( and ) and one producer (). First, a consumer () runs; it acquires the lock (c1), checks if any buffers are ready for consumption (c2), and finding that none are, waits (c3) (which releases the lock).
Then the producer () runs. It acquires the lock (p1), checks if all buffers are full (p2), and finding that not to be the case, goes ahead and fills the buffer (p4). The producer then signals that a buffer has been filled (p5). Critically, this moves the first consumer () from sleeping on a condition variable to the ready queue; is now able to run (but not yet running). The producer then continues until realizing the buffer is full, at which point it sleeps (p6, p1–p3).
Here is where the problem occurs: another consumer () sneaks in and consumes the one existing value in the buffer (c1, c2, c4, c5, c6, skipping the wait at c3 because the buffer is full). Now assume runs; just before returning from the wait, it re-acquires the lock and then returns. It then calls get()
(c4), but there are no buffers to consume! An assertion triggers and the code has not functioned as desired. Clearly, one should have somehow prevented from trying to consume because snuck in and consumed the one value in the buffer that had been produced. The figure given below shows the action each thread takes, as well as its scheduler state (Ready, Running, or Sleeping) over time.
The problem arises for a simple reason: after the producer woke , but before ever ran, the state of the bounded buffer changed (thanks to ). Signaling a thread only wakes them up; it is thus a hint that the state of the world has changed (in this case, that a value has been placed in the buffer), but there is no guarantee that when the woken thread runs, the state will still be as desired. This interpretation of what a signal means is often referred to as
Let’s continue this discussion of the producer/consumer problem in the next lesson.
Get hands-on with 1400+ tech skills courses.