The Producer/Consumer (Bounded Buffer) Problem

This lesson introduces the famous producer/consumer problem and presents an attempt to solve it.

We'll cover the following

The next synchronization problem we will confront in this chapter is known as the producer/consumer problem, or sometimes as the bounded buffer problem, which was first posed by Dijkstra“Information Streams Sharing a Finite Buffer” by E.W. Dijkstra. Information Processing Letters 1: 179180, 1972. Available: http://www.cs.utexas.edu/users/EWD/ewd03xx/EWD329.PDF The famous paper that introduced the producer/consumer problem.. Indeed, it was this very producer/consumer problem that led Dijkstra and his co-workers to invent the generalized semaphore, which can be used as either a lock or a condition variable“My recollections of operating system design” by E.W. Dijkstra. April, 2001. Avail- able: http://www.cs.utexas.edu/users/EWD/ewd13xx/EWD1303.PDF. A fascinating read for those of you interested in how the pioneers of our field came up with some very basic and fundamental concepts, including ideas like “interrupts” and even “a stack”!. We will learn more about semaphores later.

The problem

Imagine one or more producer threads and one or more consumer threads. Producers generate data items and place them in a buffer; consumers grab said items from the buffer and consume them in some way.

This arrangement occurs in many real systems. For example, in a multi-threaded web server, a producer puts HTTP requests into a work queue (i.e., the bounded buffer). While, the consumer threads take requests out of this queue and process them.

A bounded buffer is also used when you pipe the output of one program into another, e.g., grep foo file.txt | wc -l. This example runs two processes concurrently; grep writes lines from file.txt with the string foo in them to what it thinks is standard output; the UNIX shell redirects the output to what is called a UNIX pipe (created by the pipe system call). The other end of this pipe is connected to the standard input of the process wc, which simply counts the number of lines in the input stream and prints out the result. Thus, the grep process is the producer; the wc process is the consumer; between them is an in-kernel bounded buffer; you, in this example, are just the happy user.

Because the bounded buffer is a shared resource, we must, of course, require synchronized access to it, lestThis is where we drop some serious Old English on you, and the subjunctive form. a race condition arise. To begin to understand this problem better, let’s examine some actual code.

The first thing needed is a shared buffer, into which a producer puts data, and out of which a consumer takes data. Let’s just use a single integer for simplicity (you can certainly imagine placing a pointer to a data structure into this slot instead), and the two inner routines to put a value into the shared buffer, and to get a value out of the buffer. See the code excerpt below for details.

Press + to interact
int buffer;
int count = 0; // initially, empty
void put(int value) {
assert(count == 0);
count = 1;
buffer = value;
}
int get() {
assert(count == 1);
count = 0;
return buffer;
}

Pretty simple, no? The put() routine assumes the buffer is empty (and checks this with an assertion), and then simply puts a value into the shared buffer and marks it full by setting count to 1. The get() routine does the opposite, setting the buffer to empty (i.e., setting count to 0) and returning the value. Don’t worry that this shared buffer has just a single entry; later, you’ll generalize it to a queue that can hold multiple entries, which will be even more fun than it sounds.

Now you need to write some routines that know when it is OK to access the buffer to either put data into it or get data out of it. The conditions for this should be obvious: only put data into the buffer when count is zero (i.e., when the buffer is empty), and only get data from the buffer when count is one (i.e., when the buffer is full). If you write the synchronization code such that a producer puts data into a full buffer, or a consumer gets data from an empty one, you have done something wrong (and in this code, an assertion will fire).

This work is going to be done by two types of threads, one set of which is called the producer threads, and the other set, consumer threads. The code excerpt below shows the code for a producer that puts an integer into the shared buffer loops number of times, and a consumer that gets the data out of that shared buffer (forever), each time printing out the data item it pulled from the shared buffer.

Press + to interact
void *producer(void *arg) {
int i;
int loops = (int) arg;
for (i = 0; i < loops; i++) {
put(i);
}
}
void *consumer(void *arg) {
while (1) {
int tmp = get();
printf("%d\n", tmp);
}
}

A broken solution

Now imagine that you have just a single producer and a single consumer. Obviously the put() and get() routines have critical sections within them, as put() updates the buffer, and get() reads from it. However, putting a lock around the code doesn’t work; you need something more.

Not surprisingly, that something more is some condition variables. In this (broken) first try (given below), we have a single condition variable cond and associated lock mutex.

Press + to interact
int loops; // must initialize somewhere...
cond_t cond;
mutex_t mutex;
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++){
Pthread_mutex_lock(&mutex); // p1
if (count == 1) // p2
Pthread_cond_wait(&cond, &mutex); // p3
put(i); // p4
Pthread_cond_signal(&cond); // p5
Pthread_mutex_unlock(&mutex); // p6
}
}
void *consumer(void *arg) {
int i;
for(i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex); // c1
if (count == 0) // c2
Pthread_cond_wait(&cond, &mutex); // c3
int tmp = get(); // c4
Pthread_cond_signal(&cond); // c5
Pthread_mutex_unlock(&mutex); // c6
printf("%d\n", tmp);
}
}

Explanation

Let’s examine the signaling logic between producers and consumers. When a producer wants to fill the buffer, it waits for it to be empty (p1– p3). The consumer has the exact same logic but waits for a different condition: fullness (c1–c3).

With just a single producer and a single consumer, the code given above works. However, if you have more than one of these threads (e.g., two consumers), the solution has two critical problems. What are they?

… (pause here to think) …

Let’s understand the first problem, which has to do with the if statement before the wait. Assume there are two consumers (Tc1T_{c1} and Tc2T_{c2}) and one producer (TpT_p). First, a consumer (Tc1T_{c1}) runs; it acquires the lock (c1), checks if any buffers are ready for consumption (c2), and finding that none are, waits (c3) (which releases the lock).

Then the producer (TpT_p) runs. It acquires the lock (p1), checks if all buffers are full (p2), and finding that not to be the case, goes ahead and fills the buffer (p4). The producer then signals that a buffer has been filled (p5). Critically, this moves the first consumer (Tc1T_{c1}) from sleeping on a condition variable to the ready queue; Tc1T_{c1} is now able to run (but not yet running). The producer then continues until realizing the buffer is full, at which point it sleeps (p6, p1–p3).

Here is where the problem occurs: another consumer (Tc2T_{c2}) sneaks in and consumes the one existing value in the buffer (c1, c2, c4, c5, c6, skipping the wait at c3 because the buffer is full). Now assume Tc1T_{c1} runs; just before returning from the wait, it re-acquires the lock and then returns. It then calls get() (c4), but there are no buffers to consume! An assertion triggers and the code has not functioned as desired. Clearly, one should have somehow prevented Tc1T_{c1} from trying to consume because Tc2T_{c2} snuck in and consumed the one value in the buffer that had been produced. The figure given below shows the action each thread takes, as well as its scheduler state (Ready, Running, or Sleeping) over time.

The problem arises for a simple reason: after the producer woke Tc1T_{c1}, but before Tc1T_{c1} ever ran, the state of the bounded buffer changed (thanks to Tc2T_{c2}). Signaling a thread only wakes them up; it is thus a hint that the state of the world has changed (in this case, that a value has been placed in the buffer), but there is no guarantee that when the woken thread runs, the state will still be as desired. This interpretation of what a signal means is often referred to as Mesa semantics, after the first research that built a condition variable in such a manner“Experience with Processes and Monitors in Mesa” by B.W. Lampson, D.R. Redell. Communications of the ACM. 23:2, pages 105-117, February 1980. A terrific paper about how to actually implement signaling and condition variables in a real system, leading to the term “Mesa” semantics for what it mzshortns to be woken up; the older semantics, developed by Tony Hoare, then became known as “Hoare” semantics, which is hard to say out loud in class with a straight face.. The contrast referred to as Hoare semantics, is harder to build but provides a stronger guarantee that the woken thread will run immediately upon being woken“Monitors: An Operating System Structuring Concept” by C.A.R. Hoare. Communications of the ACM, 17:10, pages 549–557, October 1974. Hoare did a fair amount of theoretical work in concurrency. However, he is still probably most known for his work on Quicksort, the coolest sorting algorithm in the world, at least according to these authors.. Virtually every system ever built employs Mesa semantics.

Let’s continue this discussion of the producer/consumer problem in the next lesson.

Get hands-on with 1400+ tech skills courses.