Extending The Solution for Multiple Producers And Consumers

Let's build on the solution from the previous lesson to handle the case of more than one producer and consumer.

Let us now imagine that MAX is greater than 1 (say MAX=10). For this example, let us assume that there are multiple producers and multiple consumers. We now have a problem: a race condition. Do you see where it occurs? (take some time and look for it in the code in the previous lesson) If you can’t see it, here’s a hint: look more closely at the put() and get() code.

OK, let’s understand the issue. Imagine two producers (PaP_a and PbP_b) both calling into put() at roughly the same time. Assume producer PaP_a gets to run first, and just starts to fill the first buffer entry (fill=0 at Line F1). Before PaP_a gets a chance to increment the fill counter to 1, it is interrupted. Producer PbP_b starts to run, and at Line F1 it also puts its data into the 0th element of the buffer, which means that the old data there is overwritten! This action is a no-no; we don’t want any data from the producer to be lost.

A solution: adding mutual exclusion

As you can see, what we’ve forgotten here is mutual exclusion. The filling of a buffer and incrementing of the index into the buffer is a critical section, and thus must be guarded carefully. So let’s use our friend the binary semaphore and add some locks. Look at the code snippet given below.

Press + to interact
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&mutex); // Line P0 (NEW LINE)
sem_wait(&empty); // Line P1
put(i); // Line P2
sem_post(&full); // Line P3
sem_post(&mutex); // Line P4 (NEW LINE)
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&mutex); // Line C0 (NEW LINE)
sem_wait(&full); // Line C1
int tmp = get(); // Line C2
sem_post(&empty); // Line C3
sem_post(&mutex); // Line C4 (NEW LINE)
printf("%d\n", tmp);
}
}

Now we’ve added some locks around the entire put()/get() parts of the code, as indicated by the NEW LINE comments. That seems like the right idea, but it also doesn’t work. Why? Deadlock. Why does deadlock occur? Take a moment to consider it; try to find a case where deadlock arises. What sequence of steps must happen for the program to deadlock?

Avoiding deadlock

OK, now that you figured it out, here is the answer. Imagine two threads, one producer and one consumer. The consumer gets to run first. It acquires the mutex (Line C0), and then calls sem_wait() on the full semaphore (Line C1); because there is no data yet, this call causes the consumer to block and thus yield the CPU; importantly, though, the consumer still holds the lock.

A producer then runs. It has data to produce and if it were able to run, it would be able to wake the consumer thread and all would be good. Unfortunately, the first thing it does is call sem_wait() on the binary mutex semaphore (Line P0). The lock is already held. Hence, the producer is now stuck waiting too.

There is a simple cycle here. The consumer holds the mutex and is waiting for someone to signal full. The producer could signal full but is waiting for the mutex. Thus, the producer and consumer are each stuck waiting for each other: a classic deadlock.

At last, a working solution

To solve this problem, we simply must reduce the scope of the lock. The code snippet below shows the correct solution.

Press + to interact
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&empty); // Line P1
sem_wait(&mutex); // Line P1.5 (MUTEX HERE)
put(i); // Line P2
sem_post(&mutex); // Line P2.5
sem_post(&full); // Line P3
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&full); // Line C1
sem_wait(&mutex); // Line C1.5 (MUTEX HERE)
int tmp = get(); // Line C2
sem_post(&mutex); // Line C2.5 (AND HERE)
sem_post(&empty); // Line C3
printf("%d\n", tmp);
}
}

As you can see, we simply move the mutex acquire and release to be just around the critical section; the full and empty wait and signal code is left outsideIndeed, it may have been more natural to place the mutex acquire/release inside the put() and get() functions for the purposes of modularity.. The result is a simple and working bounded buffer, a commonly-used pattern in multi-threaded programs. Understand it now; use it later. You will thank us for the years to come. Or at least, you will thank us when the same question is asked on some exam, or during a job interview.

Feel free to run and play around with the correct implementation below.

#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
#include <stdlib.h>

#include "common.h"
#include "common_threads.h"
#include <semaphore.h>


int max;
int loops;
int *buffer;

int use  = 0;
int fill = 0;

sem_t empty;
sem_t full;
sem_t mutex;

#define CMAX (10)
int consumers = 1;

void put(int value) {
    buffer[fill] = value;
    fill++;
    if (fill == max)
        fill = 0;
}

int get() {
    int tmp = buffer[use];
    use++;
    if (use == max)
        use = 0;
    return tmp;
}

void *producer(void *arg) {
    int i;
    for (i = 0; i < loops; i++) {
        Sem_wait(&empty);
        Sem_wait(&mutex);
        put(i);
        Sem_post(&mutex);
        Sem_post(&full);
    }

    // end case
    for (i = 0; i < consumers; i++) {
        Sem_wait(&empty);
        Sem_wait(&mutex);
        put(-1);
        Sem_post(&mutex);
        Sem_post(&full);
    }

    return NULL;
}
                                                                               
void *consumer(void *arg) {
    int tmp = 0;
    while (tmp != -1) {
        Sem_wait(&full);
        Sem_wait(&mutex);
        tmp = get();
        Sem_post(&mutex);
        Sem_post(&empty);
        printf("%lld %d\n", (long long int) arg, tmp);
    }
    return NULL;
}

int main(int argc, char *argv[]) {
    if (argc != 4) {
        fprintf(stderr, "usage: %s <buffersize> <loops> <consumers>\n", argv[0]);
        exit(1);
    }
    max   = atoi(argv[1]);
    loops = atoi(argv[2]);
    consumers = atoi(argv[3]);
    assert(consumers <= CMAX);

    buffer = (int *) malloc(max * sizeof(int));
    assert(buffer != NULL);
    int i;
    for (i = 0; i < max; i++) {
        buffer[i] = 0;
    }

    Sem_init(&empty, max); // max are empty 
    Sem_init(&full, 0);    // 0 are full
    Sem_init(&mutex, 1);   // mutex

    pthread_t pid, cid[CMAX];
    Pthread_create(&pid, NULL, producer, NULL); 
    for (i = 0; i < consumers; i++) {
        Pthread_create(&cid[i], NULL, consumer, (void *) (long long int) i); 
    }
    Pthread_join(pid, NULL); 
    for (i = 0; i < consumers; i++) {
        Pthread_join(cid[i], NULL); 
    }
    return 0;
}

Get hands-on with 1400+ tech skills courses.