# 8.3. Producer-Consumer Problem¶

One of the most common task structures in concurrent systems is illustrated by the producer-consumer problem. In this problem, threads or processes are divided into two relative types: a producer thread is responsible for performing an initial task that ends with creating some result and a consumer thread that takes that initial result for some later task. Between the threads, there is a shared array or queue that stores the results being passed. One key feature of this problem is that the consumer removes the data from the queue and consumes it by using it in some later purpose. There is no way for the consumer thread or threads to repeatedly access data in the queue.

As an example, consider a researcher who discovers a previously unknown play and they believe it may have been written by a famous author. As part of their work, this researcher wants to know if the newly discovered text uses common words with the same frequency that author used in other works. This work could be done with a pipe-and-filter application that reads in the content of the texts, builds a search index of all words based on their frequency, performs some computation that compares the search indexes of all of the works. One thread is assigned the tasks of reading in the contents and producing the list of words, placing a word at a time in a queue. A second thread consumes the words by removing them from the queue and adding them to the data used to build the search index. This thread then becomes a producer because it provides the index to yet another thread that will be performing the index comparisons.

## 8.3.1. Producer-Consumer with Unbounded Queue¶

As a first variant on this problem, consider two threads that share an unbounded queue. This approach can be implemented using a linked list approach for the queue. Specifically, we can assume that there is a queue_t structure that contains pointers to the front and back of the queue, where the nodes in the queue are of type queue_node_t. The nodes all contain pointers to some sort of data_t field. Code Listing 8.10 shows the framework for enqueueing and dequeueing data.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 /* Code Listing 8.10: Enqueue and dequeue operations for a linked list implementation of a queue */ void enqueue_unsafe (queue_t *queue, data_t *data) { /* Create a new node and make it the new back of the queue */ queue->back->next = calloc (1, sizeof (queue_node_t)); assert (queue->back->next != null); queue->back = queue->back->next; queue->back->data = data; } data_t * dequeue_unsafe (queue_t *queue) { /* If back = front, then the queue is empty */ if (queue->back == queue->front) return NULL; data_t * data = queue->front->data; queue_node_t * next = queue->front->next; free (queue->front); queue->front = next; return data; } 

This implementation, which would be acceptable for a single-threaded application, has a race condition when used in a concurrent setting. Specifically, if one thread begins to enqueue() some data, another thread that tries to dequeue() the data at the same time may get a NULL pointer because the first thread has not yet reached the line the advances the back pointer.

The solution here would be to refactor these functions to become a monitor as shown in Code Listing 8.11. Each function would be passed a reference to a shared mutex that would be locked on entry and released just before the function returns. This solution eliminates the race condition regarding the timing of access to the queue’s back field. Additionally, this solution is generalizable regardless of the number of producers and consumers. If there are multiple producers trying to enqueue() data, the mutex ensures that they will not try to manipulate the queue at that same time. For brevity, Code Listing 8.11 calls the functions from Code Listing 8.10.

/* Code Listing 8.11:
A synchronized version of linked list enqueueing and dequeueing
*/

void
enqueue (queue_t *queue, data_t *data, pthread_mutex_t *lock)
{
enqueue_unsafe (queue, data);
}

data_t *
dequeue (queue_t *queue, pthread_mutex_t *lock)
{
data_t * data = dequeue_unsafe (queue);
return data;
}


## 8.3.2. Single Producer-Single Consumer Solution Using a Bounded Queue¶

In many cases, the unbounded queue of the previous section is neither feasible nor desirable. For instance, this approach makes it very easy to launch a denial-of-service attack against the application, particularly if this code is used in a server. Code Listing 8.12 shows a single line of code that quickly exhausts the program’s dynamic memory resources, leading to a system crash.

 1 2 3 4 5 6 /* Code Listing 8.12: A trivial denial-of-service attack against the code in 8.11 */ while (1) enqueue (queue, data, lock); 

Given this weakness, systems software typically imposes constraints on the number of items in the queue. One approach would be to use semaphores with the linked list queue above. Another approach is to use a finite circular array as the basis. For this approach, we will still assume the use of a queue_t data type to represent the queue, however the internal implementation uses an array instead of linked nodes. Figure 8.3.1 illustrates the structure of the queue.

Figure 8.3.1: A circular queue using an array

Code Listing 8.13 shows the unsafe version of the enqueue() and dequeue() operations for an array implementation of a queue. Within the queue_t data type, contents is an array of pointers to the enqueued data, while front and back are indexes into this array. When either of the indexes are incremented, the operations must recalculate the value modulo the array size to ensure that the values stay within the bounds of the array. This approach creates a circular structure, where the spaces in the array can be reused. Once the back index reaches the end of the array, it would return to the first position and continue from there.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 /* Code Listing 8.13: Enqueue and dequeue operations for an array queue implementation */ void enqueue_unsafe (queue_t *queue, data_t *data) { /* Store the data in the array and advance the index */ queue->contents[queue->back++] = data; queue->back %= queue->capacity; } data_t * dequeue_unsafe (queue_t *queue) { data_t * data = queue->contents[queue->front++]; queue->front %= queue->capacity; return data; } 

The implementation in Code Listing 8.13 has a fundamental flaw in both operations, as neither enforces the limited capacity. This design choice was intentional, as this code was meant to encapsulate the enqueueing and dequeueing behavior. That is, we can now reason through the synchronization issues for the producer-consumer problem without regard for the specific queue implementation.

To build the rationale for the solution to the producer-consumer problem, consider Code Listing 8.14. This implementation attempts to keep track of the number of items (queue->size) in the queue, comparing it with values that indicate whether the queue is full or empty.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 /* Code Listing 8.14: An unsuccessful attempt at solving producer-consumer */ void enqueue_failure (queue_t *queue, data_t *data) { if (queue->counter == queue->capacity) return; enqueue_unsafe (queue, data); queue->counter++; } data_t * dequeue_failure (queue_t * queue) { if (queue->counter == 0) return NULL; data_t * data = dequeue_unsafe (queue); queue->counter--; return data; } 

The attempted solution in Code Listing 8.14 fails because it has race conditions on the queue’s counter variable. If the producer is attempting to enqueue an item at the same moment the consumer is dequeueing one, the outcome depends on whether the producer’s check for available space happens before or after the consumer decrements the counter. A similar race condition arises if the queue is empty and both functions are called. This race condition could be fixed by wrapping the accesses to the counter with a mutex.

Code Listing 8.14 has another flaw from the producer’s perspective: there is no indication that there was a failure to enqueue the item. Addressing this failure would require changing the function’s interface to return a status to indicate success or failure. This approach, however, imposes an undesirable burden on the user. That is, the programmer who is building the producer and consumer threads or processes must build in a fail-safe mechanism to respond accordingly if the enqueue() or dequeue() operations fail.

A better, more user-friendly solution to the producer-consumer problem is to use semaphores. Note that semaphores incorporate the key functionality that we need: atomic incrementing and decrementing of a counter variable. The previous approach was trying to re-invent this already-solved problem. Code Listing 8.15 shows the framework for solving the producer-consumer problem for a single producer and single consumer.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 /* Code Listing 8.15: Solution for a single producer and single consumer */ void enqueue (queue_t *queue, data_t *data, sem_t *space, sem_t *items) { sem_wait (space); enqueue_unsafe (queue, data); sem_post (items); } data_t * dequeue (queue_t * queue, sem_t *space, sem_t *items) { sem_wait (items); data_t * data = dequeue_unsafe (queue); sem_post (space); return data; } 

The structure of this approach is to use signaling with two semaphores. The key insight here is that there are actually two bounds that need enforced: a maximum and a minimum. If the queue is full, then the producer needs to wait until there is a space available. That is, the producer must call sem_wait(space) and wait if necessary. The space semaphore is initialized to the capacity of the queue, so it would only be 0 if that many items are already in the queue. Once the consumer has removed an item from the queue, it performs the counterpart call sem_post(space) to alert the producer that a space is available. In short, the space semaphore enforces the maximum capacity of the queue.

At the same time, the consumer must not attempt to remove an item from the queue if none have been enqueued. Depending on the internal state of the queue, this attempt to dequeue an item might return old data that has previously be removed or an unexpected NULL pointer. The items semaphore, which is initialized to 0 and represents the number of enqueued items, enforces this constraint. If the queue is empty, this semaphore would have an internal value of 0, causing the consumer to block when it calls sem_wait(items). Once the producer puts an item into the queue, it would perform the corresponding sem_post(items) that would increment the semaphore and unblock the consumer.

## 8.3.3. Multiple Producers Solution Using a Bounded Queue¶

The solution in the previous section works successfully if there is only a single producer. However, if there are multiple producers, the previous solution will not work and should not be used. The problem is the line highlighted in Code Listing 8.16. Recall that post-increments in C are not atomic operations. That is, the internal execution at the machine language level involves a three-step process: load the value into a register, increment the value in the register, and store the result back into memory. If two threads try to perform this increment at the same time, the increments could interfere with each other.

 1 2 3 4 5 /* Code Listing 8.16: Non-atomic increments are always race conditions */ queue->contents[queue->back++] = data; 

The solution in this case would be to add a lock as shown in Code Listing 8.17. Acquiring and releasing the lock as shown here minimizes the size of the critical section. That is, the operations on the semaphores do not require protection. Including the calls to sem_wait() and sem_post() within the critical section for the lock would unnecessarily prevent the other producers from manipulating the semaphores, partially eliminating the benefit of using a semaphore.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 /* Code Listing 8.17: Solution for a single producer and single consumer */ void enqueue (queue_t *queue, data_t *data, sem_t *space, sem_t *items, pthread_mutex_t *lock) { sem_wait (space); pthread_mutex_lock (lock); enqueue_unsafe (queue, data); pthread_mutex_unlock (lock); sem_post (items); } 

Note that this approach does not require any modification to the dequeue() operation used by the consumer. Since there is only a single consumer, there is no race condition on incrementing the queue->front variable. This solution could be extended to multiple consumers by introducing a second lock for consumers used in dequeue() as shown in Code Listing 8.18. It is important, in this case, to use separate locks rather than simply reusing the same lock for both producers and consumers. When both the space and items semaphores have positive values, then the front and back of the queue are guaranteed to be in different places. That is, there are multiple items in the queue, so the producer and consumer within the critical sections would not be interfering with each other. Consequently, there is no reason that the producer needs to lock out the consumer, or vice versa.

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 /* Code Listing 8.18: Solution for a multiple producers and consumers */ void enqueue (queue_t *queue, data_t *data, sem_t *space, sem_t *items, pthread_mutex_t *producer_lock) { sem_wait (space); pthread_mutex_lock (producer_lock); enqueue_unsafe (queue, data); pthread_mutex_unlock (producer_lock); sem_post (items); } data_t * dequeue (queue_t * queue, sem_t *space, sem_t *items, pthread_mutex_t *consumer_lock) { sem_wait (items); pthread_mutex_lock (consumer_lock); data_t * data = dequeue_unsafe (queue); pthread_mutex_unlock (consumer_lock); sem_post (space); return data; }