«  3.5. POSIX vs. System V IPC   ::   Contents   ::   3.7. Shared Memory  »

3.6. Message Passing With Message Queues

Message queues allow processes to communicate by exchanging structured messages. As with pipes and FIFOs, processes can message queues follow a message passing IPC model for exchanging data in discrete messages. However, message queues differ from pipes in several important ways:

  • While pipes send unstructured byte streams, messages are sent as distinct units. When a message is retrieved from the queue, the process receives exactly one message in its entirety; there is no way to retrieve part of a message and leave the rest in the queue.
  • Message queues use special identifiers, rather than file descriptors. As a result, message queues require special functions for sending and receiving data, rather than the standard file interface functions.
  • Message queues have associated metadata that allows processes to specify the order in which messages are received. That is, message queues do not require or guarantee a first-in, first-out ordering.
  • Message queues have kernel-level persistence and use special functions or utilities for removing them. Killing the process will not remove the message queue.
Decorative bug warning

Bug Warning


It is important to understand the difference between messages and byte streams. Assume that one process has made 100 calls to a function to transmit a single byte at a time. If pipes or FIFOs are used, all of this data can be retrieved with a single call to read() that requests 100 bytes. In the case of message queues, each byte is a distinct message that must be retrieved individually. There is no short-cut to retrieve all of the data at once.

Now, assume that the sending process sent all 100 bytes with one function call. If the processes are using a pipe or FIFO and the receiver requests only 50 bytes with read(), that process will retrieve exactly 50 bytes. However, if message queues are used, the bytes would be converted into a single message; the size of the message is determined based on the parameters used to set up the message queue. For instance, the default message size for POSIX message queues in Linux is 8192 bytes. If the receiving process requests less than this (such as requesting only 50 bytes), it will receive nothing. Ignoring the difference between a message and a byte stream can lead to unexpected behavior that causes unanticipated failures.

POSIX message queues also provide a number of key features that are not available in other interfaces, such as System V:

  • A POSIX message queue is only removed once it is closed by all processes currently using it.
  • POSIX message queues include an asynchronous notification feature that allows processes to alerted when a message is available.
  • POSIX message metadata includes an associated priority level. The first message retrieved is always the one with the highest priority level.
  • POSIX message queues allow application designers to specify attributes (such as message size or capacity of the queue) via optional parameters passed when opening the queue.

Figure 3.6.2 illustrates a key point about POSIX message queues. In this example, assume that the messages A, B, C, D, and E have been inserted (in that order). POSIX message queues use non-negative values for the priority, with 0 being lowest. As such, messages C and D are at the front of the queue because they have the highest priority. Within a single priority level, though, the queue uses a first-in, first-out ordering.

POSIX message queues are priority-based, with 0 as the lowest priority

Figure 3.6.2: POSIX message queues are priority-based, with 0 as the lowest priority

3.6.1. POSIX Message Queues

There are six functions typically used for setting up and using POSIX message queues.

Decorative C library image

C library functions – <mqueue.h>


mqd_t mq_open (const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);
Open (and possibly create) a POSIX message queue.
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
Get the attributes associated with a given message queue.
int mq_close (mqd_t mqdes);
Close a message queue.
int mq_unlink (const char *name);
Initiate deletion of a message queue.
int mq_send (mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
Send the message pointed to by msg_ptr with priority msg_prio.
ssize_t mq_receive (mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
Receive a message into a buffer pointed to by msg_ptr and get its priority msg_prio.

As with other POSIX IPC open functions, mq_open() includes both an oflag and mode parameter. In addition, the fourth parameter (attr) is used to specify attributes about the message queue, such as the message size or the capacity of the queue. Note that calls to mq_open() must have exactly 2 or 4 parameters. Both the mode and attr parameters must be included when creating a new message queue. When opening a connection to an existing message queue, both of these parameters are omitted. To use the system defaults when setting up a message queue, pass NULL as the fourth parameter to mq_open(). Code Listing 3.7 creates a message queue for writing and sends a simple message.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
/* Code Listing 3.7:
   Sending "HELLO" through a POSIX message queue
 */

/* Create and open a message queue for writing */
mqd_t mqd = mq_open ("/OpenCSF_MQ", O_CREAT | O_EXCL | O_WRONLY,  0600, NULL);

/* Ensure the creation was successful */
if (mqd == -1)
  {
    perror ("mq_open");
    exit (1);
  }

/* Send "HELLO" as a message with priority 10, then close the queue.
   Note the size is 6 to include the null byte '\0'. */
mq_send (mqd, "HELLO", 6, 10);
mq_close (mqd);

Code Listing 3.8 illustrates a standard approach for retrieving a message from the queue. The msg_len parameter for receiving messages requires special attention. The use of this parameter for mq_send() is intuitive and matches the behavior of functions like write() and strncpy(): msg_len specifies the maximum number of characters in the string identified by msg_ptr that will be sent. However, when receiving messages, msg_len must match the size of a message. As such, the standard approach is to use mq_getattr() and access the mq_msgsize field of the struct mq_attr returned.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* Code Listing 3.8:
   Opening a POSIX message queue and retrieving a message
 */

/* Open the message queue for reading */
mqd_t mqd = mq_open ("/OpenCSF_MQ", O_RDONLY);
assert (mqd != -1);

/* Get the message queue attributes */
struct mq_attr attr;
assert (mq_getattr (mqd, &attr) != -1);

char *buffer = calloc (attr.mq_msgsize, 1);
assert (buffer != NULL);

/* Retrieve message from the queue and get its priority level */
unsigned int priority = 0;
if ((mq_receive (mqd, buffer, attr.mq_msgsize, &priority)) == -1)
  printf ("Failed to receive message\n");
else
  printf ("Received [priority %u]: '%s'\n", priority, buffer);

/* Clean up the allocated memory and message queue */
free (buffer);
buffer = NULL;
mq_close (mqd);
Decorative bug warning

Bug Warning


In many instances, it is common practice to create a statically sized buffer as a local variable (such as char buffer[100];) and use the buffer size when reading data from some other source. However, the following line of code will not work with message queues:

int bytes_read = mq_receive (mqid, buffer, 100, &prio);

The reason for this is that the msg_len parameter does not exactly match the message queue’s message size (a common default is 8192, but it depends on the system). As such, the message queue will interpret this as a request to read less than a full message and will return nothing.

Unlike pipes, message queues can be used to send struct instances, even if some of the fields contain the value 0. (Pipes treat a byte value of 0 as the null byte and stop at that point.) Consider the following trivial struct declaration:

1
2
3
4
5
struct message {
  int x;
  char y;
  long z;
};

As shown in Code Listing 3.9, sending the message with POSIX message queues works almost exactly as shown previously with a char array. The only differences are the casting of the msg_ptr pointer and that the msg_len parameter is based on the size of the struct. Similarly, the only difference with reading is the casting of the msg_ptr to the struct type.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
/* Code Listing 3.9:
   Retrieving a structured POSIX message
 */

struct message msg;
msg.x = 0;
msg.y = 'q';
msg.z = -1;

/* Sending a struct works identically to a char array */
mq_send (mqd, (const char *)&msg, sizeof (struct message), 10);

/* When reading, use a char* buffer and explicitly cast */
if ((mq_receive (mqd, buffer, attr.mq_msgsize, &prio)) != -1)
  {
    struct message *msg = (struct message *)buffer;
    /* Retrieve message fields here */
  }

The default behavior for POSIX message queues is to perform blocking I/O when writing to a full queue (or reading from an empty one). If this behavior is undesirable, there are three alternatives that can be used. The first is to include the O_NONBLOCK option in the oflag bit mask to open the queue in non-blocking mode. If the queue is ever full, mq_send() will return an error without blocking. The other option is to use mq_timedsend() and mq_timedreceive(), which use an additional parameter (abs_timeout) to specify the maximum amount of time to wait when blocked. Finally, rather than attempting to retrieve a message that may not have been sent, a process can use mq_notify() to request an asynchronous notification that a message has been sent. Interested readers should consult the language documentation for more information on these functions.

Decorative C library image

C library functions – <mqueue.h>


int mq_timedsend (mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);
Try to send a message, but specify a maximum time limit (abs_timeout) for blocking.
ssize_t mq_timedreceive (mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
Try to receive a message, but specify a maximum time limit (abs_timeout) for blocking.
int mq_notify (mqd_t mqdes, const struct sigevent *notification);
Request asynchronous notification when a message is placed in the queue.
«  3.5. POSIX vs. System V IPC   ::   Contents   ::   3.7. Shared Memory  »

Contact Us License