The so-called "producer/consumer problem" involves multiple producer threads writing into a shared resource and multiple consumer threads reading from it. A synchronization is needed to make sure that:
Here is an example program which implements several threads writing/reading a global shared resource. A producer and a consumer function respectively receive a callback to write and read the global resource. They both use a condition variable to maintain a counter of readers to make possible multiple readers read the resource at the same time and make sure that only one writer modify the resource at any moment when there are no pending readers. To simplify the source file, the return codes of the services are not checked.
#include <pthread.h> #include <stdio.h> #include <stdlib.h> pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; typedef int (* write_op_t)(void *data); typedef int (* read_op_t)(void *data); int readers; char resource[50]; int producer(write_op_t wr, void *data) { int rc; pthread_mutex_lock(&mutex); // If there are readers ==> we wait while (readers > 0) { // Wait for a signal from the readers pthread_cond_wait(&cond, &mutex); } // The mutex is locked here // Write operation rc = (* wr)(data); // wake up any waiting writer pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); return rc; } int consumer(read_op_t rd, void *data) { int rc; pthread_mutex_lock(&mutex); // One more reader readers ++; pthread_mutex_unlock(&mutex); // Read operation rc = (* rd)(data); pthread_mutex_lock(&mutex); // One less reader readers --; // Wake up any waiting writer if (readers == 0) { pthread_cond_signal(&cond); } pthread_mutex_unlock(&mutex); } int read_op(void *data) { char *name = (char *)data; char local_resource[sizeof(resource) + 50]; snprintf(local_resource, sizeof(local_resource), "readers=%d, resource='%s'", readers, resource); printf("%s: %s\n", name, local_resource); return 0; } int write_op(void *data) { char *name = (char *)data; printf("%s writing\n", name); snprintf(resource, sizeof(resource), "%s", name); return 0; } void *entry(void *p) { char *name = (char *)p; int i = 10; int rc; do { if (i & 0x1) { rc = producer(write_op, name); } else { rc = consumer(read_op, name); } i --; } while (i); printf("End of %s\n", name); return NULL; } int main(void) { pthread_t tid[4]; int i; char *name[4]; for (i = 0; i < 4; i ++) { name[i] = malloc(20); snprintf(name[i], 20, "thread#%d", i); pthread_create(&(tid[i]), NULL, entry, name[i]); } for (i = 0; i < 4; i ++) { pthread_join(tid[i], NULL); free(name[i]); } return 0; } |
Under Linux, it is built like this:
$ gcc prod_cons_1.c -o prod_cons_1 -l prthread |
The execution shows the competition between readers and writers:
$ ./prod_cons_1 thread#0: readers=1, resource='' thread#1: readers=2, resource='' thread#2: readers=3, resource='' thread#3: readers=4, resource='' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing End of thread#1 thread#0 writing thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing thread#2: readers=1, resource='thread#2' thread#0: readers=1, resource='thread#2' thread#3: readers=2, resource='thread#2' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing End of thread#3 thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing End of thread#0 thread#2 writing End of thread#2 |
The previous implementation gives more priority to the readers than the writers. Because a reader can enter the critical section as soon as it gets the mutex but the writer must wait for the counter of readers to be equal to 0 before entering into the critical section. So, the writer threads suffer starvation when many reader threads access the resource. To give more chances to the writer thread to enter the critical section, it is possible to slightly enhance the previous program by adding a limit on the number of reader threads entering the critical section. Hence, the reader thread (in the consumer() function) checks the limit and wait on the condition variable if the counter of readers has reached the limit. In the following program, we pass the limit as parameter to the program.
#include <pthread.h> #include <stdio.h> #include <stdlib.h> pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; typedef int (* write_op_t)(void *data); typedef int (* read_op_t)(void *data); int readers; char resource[50]; int max_readers = 1; int producer(write_op_t wr, void *data) { int rc; pthread_mutex_lock(&mutex); // If there are readers ==> we wait while (readers > 0) { // Wait for a signal from the readers pthread_cond_wait(&cond, &mutex); } // The mutex is locked here // Write operation rc = (* wr)(data); // wake up any waiting writer pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); return rc; } int consumer(read_op_t rd, void *data) { int rc; pthread_mutex_lock(&mutex); // If the number of readers has reached the limit // wait for the counter to decrease while (readers == max_readers) { // Wait for a signal from the readers or writers pthread_cond_wait(&cond, &mutex); } // The mutex is locked here // One more reader readers ++; pthread_mutex_unlock(&mutex); // Read operation rc = (* rd)(data); pthread_mutex_lock(&mutex); // One less reader readers --; // Wake up any waiting writer/reader if (readers == 0) { pthread_cond_signal(&cond); } pthread_mutex_unlock(&mutex); } int read_op(void *data) { char *name = (char *)data; char local_resource[sizeof(resource) + 50]; snprintf(local_resource, sizeof(local_resource), "readers=%d, resource='%s'", readers, resource); printf("%s: %s\n", name, local_resource); return 0; } int write_op(void *data) { char *name = (char *)data; printf("%s writing\n", name); snprintf(resource, sizeof(resource), "%s", name); return 0; } void *entry(void *p) { char *name = (char *)p; int i = 10; int rc; do { if (i & 0x1) { rc = producer(write_op, name); } else { rc = consumer(read_op, name); } i --; } while (i); printf("End of %s\n", name); return NULL; } int main(int ac, char *av[]) { pthread_t tid[4]; int i; char *name[4]; if (ac == 2) { max_readers = atoi(av[1]); } for (i = 0; i < 4; i ++) { name[i] = malloc(20); snprintf(name[i], 20, "thread#%d", i); pthread_create(&(tid[i]), NULL, entry, name[i]); } for (i = 0; i < 4; i ++) { pthread_join(tid[i], NULL); free(name[i]); } return 0; } |
$ gcc prod_cons_2.c -o prod_cons_2 -lpthread $ ./prod_cons_2 3 thread#0: readers=1, resource='' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing End of thread#0 thread#1: readers=1, resource='thread#0' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing End of thread#1 thread#2: readers=1, resource='thread#1' thread#3: readers=2, resource='thread#1' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing End of thread#3 thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing End of thread#2 $ ./prod_cons_2 1 thread#0: readers=1, resource='' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing thread#0: readers=1, resource='thread#0' thread#0 writing End of thread#0 thread#1: readers=1, resource='thread#0' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing thread#1: readers=1, resource='thread#1' thread#1 writing thread#2: readers=1, resource='thread#1' thread#2 writing thread#2: readers=1, resource='thread#2' thread#1: readers=1, resource='thread#2' thread#1 writing thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing thread#2: readers=1, resource='thread#2' thread#2 writing End of thread#2 thread#1: readers=1, resource='thread#2' thread#1 writing End of thread#1 thread#3: readers=1, resource='thread#1' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing thread#3: readers=1, resource='thread#3' thread#3 writing End of thread#3 |
The author is an engineer in computer sciences located in France. He can be contacted here.