Last update: 12-Dec-2021
Author: R. Koucha
Implementation of a producer/consumer algorithm in C language
Introduction

The so-called "producer/consumer problem" involves multiple producer threads writing into a shared resource and multiple consumer threads reading from it. A synchronization is needed to make sure that:

It is possible to make an implementation based on conditional variables with the following services: pthread_cond_init(), pthread_cond_wait(), pthread_cond_signal()...

First implementation

Here is an example program which implements several threads writing/reading a global shared resource. A producer and a consumer function respectively receive a callback to write and read the global resource. They both use a condition variable to maintain a counter of readers to make possible multiple readers read the resource at the same time and make sure that only one writer modify the resource at any moment when there are no pending readers. To simplify the source file, the return codes of the services are not checked.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
typedef int (* write_op_t)(void *data);
typedef int (* read_op_t)(void *data);
int readers;

char resource[50];

int producer(write_op_t wr, void *data)
{
  int rc;

  pthread_mutex_lock(&mutex);

  // If there are readers ==> we wait
  while (readers > 0) {
    // Wait for a signal from the readers
    pthread_cond_wait(&cond, &mutex);
  }

  // The mutex is locked here

  // Write operation
  rc = (* wr)(data);

  // wake up any waiting writer
  pthread_cond_signal(&cond);

  pthread_mutex_unlock(&mutex);

  return rc;
}


int consumer(read_op_t rd, void *data)
{
  int rc;

  pthread_mutex_lock(&mutex);

  // One more reader
  readers ++;

  pthread_mutex_unlock(&mutex);

  // Read operation
  rc = (* rd)(data);

  pthread_mutex_lock(&mutex);

  // One less reader
  readers --;

  // Wake up any waiting writer
  if (readers == 0) {
    pthread_cond_signal(&cond);
  }

  pthread_mutex_unlock(&mutex);

}


int read_op(void *data)
{
  char *name = (char *)data;
  char local_resource[sizeof(resource) + 50];

  snprintf(local_resource, sizeof(local_resource), "readers=%d, resource='%s'", readers, resource);
  printf("%s: %s\n", name, local_resource);

  return 0;
}

int write_op(void *data)
{
  char *name = (char *)data;
  printf("%s writing\n", name);
  snprintf(resource, sizeof(resource), "%s", name);

  return 0;
}


void *entry(void *p)
{
  char *name = (char *)p;
  int i = 10;
  int rc;

  do {

    if (i & 0x1) {
      rc = producer(write_op, name);
    } else {
      rc = consumer(read_op, name);
    }

    i --;

  } while (i);

  printf("End of %s\n", name);

  return NULL;

}


int main(void)
{
  pthread_t tid[4];
  int i;
  char *name[4];

  for (i = 0; i < 4; i ++) {
    name[i] = malloc(20);
    snprintf(name[i], 20, "thread#%d", i);
    pthread_create(&(tid[i]), NULL, entry, name[i]);
  }

  for (i = 0; i < 4; i ++) {
    pthread_join(tid[i], NULL);
    free(name[i]);
  }

  return 0;
}

Under Linux, it is built like this:

$ gcc prod_cons_1.c -o prod_cons_1 -l prthread

The execution shows the competition between readers and writers:

$ ./prod_cons_1 
thread#0: readers=1, resource=''
thread#1: readers=2, resource=''
thread#2: readers=3, resource=''
thread#3: readers=4, resource=''
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
End of thread#1
thread#0 writing
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#0: readers=1, resource='thread#2'
thread#3: readers=2, resource='thread#2'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
End of thread#3
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
End of thread#0
thread#2 writing
End of thread#2
Management of the starvation

The previous implementation gives more priority to the readers than the writers. Because a reader can enter the critical section as soon as it gets the mutex but the writer must wait for the counter of readers to be equal to 0 before entering into the critical section. So, the writer threads suffer starvation when many reader threads access the resource. To give more chances to the writer thread to enter the critical section, it is possible to slightly enhance the previous program by adding a limit on the number of reader threads entering the critical section. Hence, the reader thread (in the consumer() function) checks the limit and wait on the condition variable if the counter of readers has reached the limit. In the following program, we pass the limit as parameter to the program.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
typedef int (* write_op_t)(void *data);
typedef int (* read_op_t)(void *data);
int readers;

char resource[50];


int max_readers = 1;


int producer(write_op_t wr, void *data)
{
  int rc;

  pthread_mutex_lock(&mutex);

  // If there are readers ==> we wait
  while (readers > 0) {
    // Wait for a signal from the readers
    pthread_cond_wait(&cond, &mutex);
  }

  // The mutex is locked here

  // Write operation
  rc = (* wr)(data);

  // wake up any waiting writer
  pthread_cond_signal(&cond);

  pthread_mutex_unlock(&mutex);

  return rc;
}


int consumer(read_op_t rd, void *data)
{
  int rc;

  pthread_mutex_lock(&mutex);

  // If the number of readers has reached the limit
  // wait for the counter to decrease
  while (readers == max_readers) {
    // Wait for a signal from the readers or writers
    pthread_cond_wait(&cond, &mutex);
  }

  // The mutex is locked here

  // One more reader
  readers ++;

  pthread_mutex_unlock(&mutex);

  // Read operation
  rc = (* rd)(data);

  pthread_mutex_lock(&mutex);

  // One less reader
  readers --;

  // Wake up any waiting writer/reader
  if (readers == 0) {
    pthread_cond_signal(&cond);
  }

  pthread_mutex_unlock(&mutex);

}


int read_op(void *data)
{
  char *name = (char *)data;
  char local_resource[sizeof(resource) + 50];

  snprintf(local_resource, sizeof(local_resource), "readers=%d, resource='%s'", readers, resource);
  printf("%s: %s\n", name, local_resource);

  return 0;
}

int write_op(void *data)
{
  char *name = (char *)data;
  printf("%s writing\n", name);
  snprintf(resource, sizeof(resource), "%s", name);

  return 0;
}


void *entry(void *p)
{
  char *name = (char *)p;
  int i = 10;
  int rc;

  do {

    if (i & 0x1) {
      rc = producer(write_op, name);
    } else {
      rc = consumer(read_op, name);
    }

    i --;

  } while (i);

  printf("End of %s\n", name);

  return NULL;

}

int main(int ac, char *av[])
{
  pthread_t tid[4];
  int i;
  char *name[4];

  if (ac == 2) {
    max_readers = atoi(av[1]);
  }

  for (i = 0; i < 4; i ++) {
    name[i] = malloc(20);
    snprintf(name[i], 20, "thread#%d", i);
    pthread_create(&(tid[i]), NULL, entry, name[i]);
  }

  for (i = 0; i < 4; i ++) {
    pthread_join(tid[i], NULL);
    free(name[i]);
  }

  return 0;
}
$ gcc prod_cons_2.c -o prod_cons_2 -lpthread
$ ./prod_cons_2 3
thread#0: readers=1, resource=''
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
End of thread#0
thread#1: readers=1, resource='thread#0'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
End of thread#1
thread#2: readers=1, resource='thread#1'
thread#3: readers=2, resource='thread#1'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
End of thread#3
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
End of thread#2
$ ./prod_cons_2 1
thread#0: readers=1, resource=''
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
thread#0: readers=1, resource='thread#0'
thread#0 writing
End of thread#0
thread#1: readers=1, resource='thread#0'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
thread#1: readers=1, resource='thread#1'
thread#1 writing
thread#2: readers=1, resource='thread#1'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#1: readers=1, resource='thread#2'
thread#1 writing
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
thread#2: readers=1, resource='thread#2'
thread#2 writing
End of thread#2
thread#1: readers=1, resource='thread#2'
thread#1 writing
End of thread#1
thread#3: readers=1, resource='thread#1'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
thread#3: readers=1, resource='thread#3'
thread#3 writing
End of thread#3
About the author

The author is an engineer in computer sciences located in France. He can be contacted here.