Message queues allow processes to communicate by exchanging structured messages. As with pipes and FIFOs, processes can message queues follow a message passing IPC model for exchanging data in discrete messages. However, message queues differ from pipes in several important ways:
- While pipes send unstructured byte streams, messages are sent as distinct units. When a message is retrieved from the queue, the process receives exactly one message in its entirety; there is no way to retrieve part of a message and leave the rest in the queue.
- Message queues use special identifiers, rather than file descriptors. As a result, message queues require special functions for sending and receiving data, rather than the standard file interface functions.
- Message queues have associated metadata that allows processes to specify the order in which messages are received. That is, message queues do not require or guarantee a first-in, first-out ordering.
- Message queues have kernel-level persistence and use special functions or utilities for removing them. Killing the process will not remove the message queue.
Bug Warning
It is important to understand the difference between messages and byte streams. Assume that one
process has made 100 calls to a function to transmit a single byte at a time. If pipes or FIFOs are
used, all of this data can be retrieved with a single call to read()
that requests 100 bytes.
In the case of message queues, each byte is a distinct message that must be retrieved individually.
There is no short-cut to retrieve all of the data at once.
Now, assume that the sending process sent all 100 bytes with one function call. If the processes are
using a pipe or FIFO and the receiver requests only 50 bytes with read()
, that process will
retrieve exactly 50 bytes. However, if message queues are used, the bytes would be converted into a
single message; the size of the message is determined based on the parameters used to set up the
message queue. For instance, the default message size for POSIX message queues in Linux is 8192
bytes. If the receiving process requests less than this (such as requesting only 50 bytes), it
will receive nothing. Ignoring the difference between a message and a byte stream can lead to
unexpected behavior that causes unanticipated failures.
POSIX message queues also provide a number of key features that are not available in other interfaces, such as System V:
- A POSIX message queue is only removed once it is closed by all processes currently using it.
- POSIX message queues include an asynchronous notification feature that allows processes to alerted when a message is available.
- POSIX message metadata includes an associated priority level. The first message retrieved is always the one with the highest priority level.
- POSIX message queues allow application designers to specify attributes (such as message size or capacity of the queue) via optional parameters passed when opening the queue.
Figure 3.6.2 illustrates a key point about POSIX message queues. In this example, assume that the messages A, B, C, D, and E have been inserted (in that order). POSIX message queues use non-negative values for the priority, with 0 being lowest. As such, messages C and D are at the front of the queue because they have the highest priority. Within a single priority level, though, the queue uses a first-in, first-out ordering.
There are six functions typically used for setting up and using POSIX message queues.
C library functions – <mqueue.h>
mqd_t mq_open (const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_close (mqd_t mqdes);
int mq_unlink (const char *name);
int mq_send (mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
ssize_t mq_receive (mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
As with other POSIX IPC open functions, mq_open()
includes both an oflag
and mode
parameter. In addition, the fourth parameter (attr
) is used to specify attributes about the
message queue, such as the message size or the capacity of the queue. Note that calls to
mq_open()
must have exactly 2 or 4 parameters. Both the mode
and attr
parameters must be
included when creating a new message queue. When opening a connection to an existing message queue,
both of these parameters are omitted. To use the system defaults when setting up a message queue,
pass NULL
as the fourth parameter to mq_open()
. Code Listing 3.7 creates a message queue for
writing and sends a simple message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | /* Code Listing 3.7:
Sending "HELLO" through a POSIX message queue
*/
/* Create and open a message queue for writing */
mqd_t mqd = mq_open ("/OpenCSF_MQ", O_CREAT | O_EXCL | O_WRONLY, 0600, NULL);
/* Ensure the creation was successful */
if (mqd == -1)
{
perror ("mq_open");
exit (1);
}
/* Send "HELLO" as a message with priority 10, then close the queue.
Note the size is 6 to include the null byte '\0'. */
mq_send (mqd, "HELLO", 6, 10);
mq_close (mqd);
|
Code Listing 3.8 illustrates a standard approach for retrieving a message from the queue. The
msg_len
parameter for receiving messages requires special attention. The use of this parameter
for mq_send()
is intuitive and matches the behavior of functions like write()
and
strncpy()
: msg_len
specifies the maximum number of characters in the string identified by
msg_ptr
that will be sent. However, when receiving messages, msg_len
must match the size of
a message. As such, the standard approach is to use mq_getattr()
and access the mq_msgsize
field of the struct mq_attr
returned.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | /* Code Listing 3.8:
Opening a POSIX message queue and retrieving a message
*/
/* Open the message queue for reading */
mqd_t mqd = mq_open ("/OpenCSF_MQ", O_RDONLY);
assert (mqd != -1);
/* Get the message queue attributes */
struct mq_attr attr;
assert (mq_getattr (mqd, &attr) != -1);
char *buffer = calloc (attr.mq_msgsize, 1);
assert (buffer != NULL);
/* Retrieve message from the queue and get its priority level */
unsigned int priority = 0;
if ((mq_receive (mqd, buffer, attr.mq_msgsize, &priority)) == -1)
printf ("Failed to receive message\n");
else
printf ("Received [priority %u]: '%s'\n", priority, buffer);
/* Clean up the allocated memory and message queue */
free (buffer);
buffer = NULL;
mq_close (mqd);
|
Bug Warning
In many instances, it is common practice to create a statically sized buffer as a local variable
(such as char buffer[100];
) and use the buffer size when reading data from some other source.
However, the following line of code will not work with message queues:
int bytes_read = mq_receive (mqid, buffer, 100, &prio);
The reason for this is that the msg_len
parameter does not exactly match the message queue’s
message size (a common default is 8192, but it depends on the system). As such, the message queue
will interpret this as a request to read less than a full message and will return nothing.
Unlike pipes, message queues can be used to send struct
instances, even if some of the fields
contain the value 0. (Pipes treat a byte value of 0 as the null byte and stop at that point.)
Consider the following trivial struct
declaration:
1 2 3 4 5 | struct message {
int x;
char y;
long z;
};
|
As shown in Code Listing 3.9, sending the message with POSIX message queues works almost exactly as
shown previously with a char
array. The only differences are the casting of the msg_ptr
pointer and that the msg_len
parameter is based on the size of the struct
. Similarly, the
only difference with reading is the casting of the msg_ptr
to the struct
type.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | /* Code Listing 3.9:
Retrieving a structured POSIX message
*/
struct message msg;
msg.x = 0;
msg.y = 'q';
msg.z = -1;
/* Sending a struct works identically to a char array */
mq_send (mqd, (const char *)&msg, sizeof (struct message), 10);
/* When reading, use a char* buffer and explicitly cast */
if ((mq_receive (mqd, buffer, attr.mq_msgsize, &prio)) != -1)
{
struct message *msg = (struct message *)buffer;
/* Retrieve message fields here */
}
|
The default behavior for POSIX message queues is to perform blocking I/O when writing to a
full queue (or reading from an empty one). If this behavior is undesirable, there are three
alternatives that can be used. The first is to include the O_NONBLOCK
option in the oflag
bit mask to open the queue in non-blocking mode. If the queue is ever full, mq_send()
will
return an error without blocking. The other option is to use mq_timedsend()
and
mq_timedreceive()
, which use an additional parameter (abs_timeout
) to specify the maximum
amount of time to wait when blocked. Finally, rather than attempting to retrieve a message that may
not have been sent, a process can use mq_notify()
to request an asynchronous notification that a
message has been sent. Interested readers should consult the language documentation for more
information on these functions.
C library functions – <mqueue.h>
int mq_timedsend (mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);
ssize_t mq_timedreceive (mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
int mq_notify (mqd_t mqdes, const struct sigevent *notification);