Extending The Solution for Multiple Producers And Consumers
Let's build on the solution from the previous lesson to handle the case of more than one producer and consumer.
We'll cover the following
Let us now imagine that MAX
is greater than 1 (say MAX=10
). For this example, let us assume that there are multiple producers and multiple consumers. We now have a problem: a race condition. Do you see where it occurs? (take some time and look for it in the code in the previous lesson) If you can’t see it, here’s a hint:
look more closely at the put()
and get()
code.
OK, let’s understand the issue. Imagine two producers ( and )
both calling into put()
at roughly the same time. Assume producer gets to run first, and just starts to fill the first buffer entry (fill=0
at Line F1). Before gets a chance to increment the fill counter to 1, it is interrupted. Producer starts to run, and at Line F1 it also puts its data into the 0th element of the buffer, which means that the old data there is overwritten! This action is a no-no; we don’t want any data from the producer to be lost.
A solution: adding mutual exclusion
As you can see, what we’ve forgotten here is mutual exclusion. The filling of a buffer and incrementing of the index into the buffer is a critical section, and thus must be guarded carefully. So let’s use our friend the binary semaphore and add some locks. Look at the code snippet given below.
void *producer(void *arg) {int i;for (i = 0; i < loops; i++) {sem_wait(&mutex); // Line P0 (NEW LINE)sem_wait(&empty); // Line P1put(i); // Line P2sem_post(&full); // Line P3sem_post(&mutex); // Line P4 (NEW LINE)}}void *consumer(void *arg) {int i;for (i = 0; i < loops; i++) {sem_wait(&mutex); // Line C0 (NEW LINE)sem_wait(&full); // Line C1int tmp = get(); // Line C2sem_post(&empty); // Line C3sem_post(&mutex); // Line C4 (NEW LINE)printf("%d\n", tmp);}}
Now we’ve added some locks around the entire put()
/get()
parts of the code, as indicated by the NEW LINE
comments. That seems like the right idea, but it also doesn’t work. Why? Deadlock. Why does deadlock occur? Take a moment to consider it; try to find a case where deadlock arises. What sequence of steps must happen for the program to deadlock?
Avoiding deadlock
OK, now that you figured it out, here is the answer. Imagine two threads, one producer and one consumer. The consumer gets to run first. It acquires the mutex (Line C0), and then calls sem_wait()
on the full semaphore (Line C1); because there is no data yet, this call causes the consumer to block and thus yield the CPU; importantly, though, the consumer still holds the lock.
A producer then runs. It has data to produce and if it were able to run, it would be able to wake the consumer thread and all would be good. Unfortunately, the first thing it does is call sem_wait()
on the binary mutex semaphore (Line P0). The lock is already held. Hence, the producer is now stuck waiting too.
There is a simple cycle here. The consumer holds the mutex and is waiting for someone to signal full. The producer could signal full but is waiting for the mutex. Thus, the producer and consumer are each stuck waiting for each other: a classic deadlock.
At last, a working solution
To solve this problem, we simply must reduce the scope of the lock. The code snippet below shows the correct solution.
void *producer(void *arg) {int i;for (i = 0; i < loops; i++) {sem_wait(&empty); // Line P1sem_wait(&mutex); // Line P1.5 (MUTEX HERE)put(i); // Line P2sem_post(&mutex); // Line P2.5sem_post(&full); // Line P3}}void *consumer(void *arg) {int i;for (i = 0; i < loops; i++) {sem_wait(&full); // Line C1sem_wait(&mutex); // Line C1.5 (MUTEX HERE)int tmp = get(); // Line C2sem_post(&mutex); // Line C2.5 (AND HERE)sem_post(&empty); // Line C3printf("%d\n", tmp);}}
As you can see, we simply move the mutex acquire and release to be just around the critical section;
Feel free to run and play around with the correct implementation below.
#include <stdio.h> #include <unistd.h> #include <assert.h> #include <pthread.h> #include <stdlib.h> #include "common.h" #include "common_threads.h" #include <semaphore.h> int max; int loops; int *buffer; int use = 0; int fill = 0; sem_t empty; sem_t full; sem_t mutex; #define CMAX (10) int consumers = 1; void put(int value) { buffer[fill] = value; fill++; if (fill == max) fill = 0; } int get() { int tmp = buffer[use]; use++; if (use == max) use = 0; return tmp; } void *producer(void *arg) { int i; for (i = 0; i < loops; i++) { Sem_wait(&empty); Sem_wait(&mutex); put(i); Sem_post(&mutex); Sem_post(&full); } // end case for (i = 0; i < consumers; i++) { Sem_wait(&empty); Sem_wait(&mutex); put(-1); Sem_post(&mutex); Sem_post(&full); } return NULL; } void *consumer(void *arg) { int tmp = 0; while (tmp != -1) { Sem_wait(&full); Sem_wait(&mutex); tmp = get(); Sem_post(&mutex); Sem_post(&empty); printf("%lld %d\n", (long long int) arg, tmp); } return NULL; } int main(int argc, char *argv[]) { if (argc != 4) { fprintf(stderr, "usage: %s <buffersize> <loops> <consumers>\n", argv[0]); exit(1); } max = atoi(argv[1]); loops = atoi(argv[2]); consumers = atoi(argv[3]); assert(consumers <= CMAX); buffer = (int *) malloc(max * sizeof(int)); assert(buffer != NULL); int i; for (i = 0; i < max; i++) { buffer[i] = 0; } Sem_init(&empty, max); // max are empty Sem_init(&full, 0); // 0 are full Sem_init(&mutex, 1); // mutex pthread_t pid, cid[CMAX]; Pthread_create(&pid, NULL, producer, NULL); for (i = 0; i < consumers; i++) { Pthread_create(&cid[i], NULL, consumer, (void *) (long long int) i); } Pthread_join(pid, NULL); for (i = 0; i < consumers; i++) { Pthread_join(cid[i], NULL); } return 0; }
Get hands-on with 1400+ tech skills courses.