A Deep Dive into POSIX Message Queues

0
866
IPC

This article throws some light on the advanced concepts of POSIX message queues. A basic knowledge of these queues will be needed to understand and appreciate the article better.

POSIX message queues are a means of inter-process communication (IPC) based on message exchange. Two or more processes exchange data in the form of messages (as a whole, without any provision to exchange partial bytes of the message), and can prioritise these messages by allowing the sender to set the priority through a parameter accompanying the message. Programs intending to use POSIX message queues must include real-time library librt. POSIX message queues are available to Linux user space, but the messages are stored in kernel space in a virtual file system. One can view the created POSIX message queues in /dev/mqueue/. POSIX message queue support is an optional kernel component that is configured via CONFIG_POSIX_MQUEUE kernel configuration option, and is enabled by default. There are other implementations that may provide such a feature, but the details are likely to differ. Using POSIX message queues, data structures can also be shared between processes in the form of messages.

Limitations of using POSIX message queues

POSIX message queues are message based inter-process communication (IPC), which means the entire message is received at once by the reading process when it calls mq_read(). Reading part of a message or just a few bytes of a message is not possible.

In addition, there is no way to check how many unread messages are present in the message queue.

It is only possible to find out how many bytes are available to be read from the message queue. This is the total size (in bytes) of unread messages, which can be more than one message. However, it is not possible to break down these bytes and find out how many messages these unread bytes in the queue comprise.

To find out how many unread bytes are in the message queue, a process can call the mq_getattr() API. The API mq_getattr() takes two parameters — input message queue descriptor of the intended queue and an output parameter, as shown below:

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

The output parameter of type struct mq_attr has a member element of type mq_curmsgs, which tells the number of bytes available in the message queue to be read.
Messages in a POSIX message queue can be read one after the other by repeatedly calling mq_read(). Each mq_read() call gets only one message and to read the next message, the process must call mq_read() again, and multiple times thereafter to empty the message queue one by one.

In blocking mode, mq_read() gets unblocked when a message is received from the message queue. In non-blocking mode, mq_read() returns EAGAIN when no message is received, and non-zero positive value when a message is received. As mentioned earlier, receiving just a part of the message or a few bytes of a message is not possible.

Behavioural aspects of POSIX message queue

Creating/opening of message queues

mq_open() is the API to create a POSIX message queue or to open an existing POSIX message queue, with the first parameter being the name of the queue. It is the second parameter oflag that decides the behaviour of mq_open() API.

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,
struct mq_attr *attr);

To open an existing POSIX message queue, use O_RDONLY for read-only, O_WRONLY for write only, and O_RDWR for both send and receive. To create a non-existing POSIX message queue, use O_CREAT. To open in non-blocking mode, use O_NONBLOCK. If this is not used, then by default it would be a blocking queue. To check and create a non-existent new POSIX message queue, use O_EXCL. If the queue already exists, then it fails and returns EXIST.

If the message queue with the given name doesn’t exist, mq_open() examines the third and fourth arguments to create a new POSIX message queue described by parameters mode_t and mq_attr.

Message queue descriptors

The file descriptor returns when any process calls mq_open(), always starts from 3 and increments thereon for every mq_open(). This is because Linux has reserved 0, 1 and 2 for stdin, stdout, and stderr respectively. For example, if a process A calls mq_open() to open/create a message queue named /mq_A, 3 is returned as mqd_t if this is the first POSIX message queue opened by process A. At the same time, if another process, let us say process B, calls mq_open() to open/create a message queue name /mq_B, here too 3 is returned as mqd_t, if this is the first POSIX message queue opened by process B.

The code given below demonstrates that for the same message queue name, mq_open() may return a different queue descriptor of type mqd_t (int), for different (sender and receiver) processes.

// pmq_sender_desc.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>

#define QUEUE_PERMISSIONS 0660
#define MQ_MAX_MESSAGES 10
#define MQ_MAX_MSG_SIZE 128
#define MQ_MSG_BUFFER_SIZE MQ_MAX_MSG_SIZE + 10

int main(int argc, char **argv)
{
mqd_t mqd_sender;
struct mq_attr attr;
const char *mq_name = “/mq-postman”;
char tx_buffer[MQ_MSG_BUFFER_SIZE] = {“Hello there - “};

attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MESSAGES;
attr.mq_msgsize = MQ_MAX_MSG_SIZE;
attr.mq_curmsgs = 0;

if((mqd_sender = mq_open(mq_name, O_WRONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == (mqd_t)-1)
{
perror(“Client: mq_open”);
exit(1);
}
else
{
printf(“Sender MQ %s opened with desc: %d\n”, mq_name, mqd_sender);
}

for(int i=0;i<10;i++)
{
sprintf(&tx_buffer[14], “%d”, i);
printf(“Sender: sending message: [%s]\n”, tx_buffer);
// send message to message queue
if(mq_send(mqd_sender, &tx_buffer[0], strlen (tx_buffer), 0) == (mqd_t)-1)
{
perror(“Sender: Unable to send message”);
}
}

if(mq_close(mqd_sender) == (mqd_t)-1)
{
perror(“Sender: mq_close”);
exit(1);
}

printf(“Sender: Exit\n”);

exit(0);
}

// pmq_receiver_desc.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>

#define MQ_QUEUE_PERMISSIONS 0660
#define MQ_MAX_MESSAGES 10
#define MQ_MAX_MSG_SIZE 128
#define MQ_MSG_BUFFER_SIZE MQ_MAX_MSG_SIZE + 10

int main (int argc, char **argv)
{
mqd_t mqd_receiver, mqd_test;
const char *postman_queue = “/mq-postman”;
const char *test_queue = “/mq-test”;
struct mq_attr attr;
char rx_buffer[MQ_MSG_BUFFER_SIZE];

attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MESSAGES;
attr.mq_msgsize = MQ_MAX_MSG_SIZE;
attr.mq_curmsgs = 0;

if((mqd_test = mq_open(test_queue, O_RDONLY | O_CREAT, MQ_QUEUE_PERMISSIONS, &attr)) == (mqd_t)-1)
{
perror (“Receiver: mq_open (test)”);
exit (1);
}
else
{
printf (“Test MQ %s opened with desc: %d\n”, test_queue, mqd_test);
}
if((mqd_receiver = mq_open(postman_queue, O_RDONLY | O_CREAT, MQ_QUEUE_PERMISSIONS, &attr)) == (mqd_t)-1)
{
perror(“Receiver: mq_open (receiver)”);
exit(1);
}
else
{
printf(“Receiver MQ %s opened with desc: %d\n”, postman_queue, mqd_receiver);
}

int i=10;
while(i)
{
printf(“Receiver: blocked until message arrives...\n”);
// get the oldest message with highest priority
if(mq_receive (mqd_receiver, rx_buffer, MQ_MSG_BUFFER_SIZE, NULL) == (mqd_t)-1)
{
perror(“Receiver: mq_receive”);
exit(1);
}
else
{
printf(“Receiver: message received: [%s]\n”, rx_buffer);
i--;
}
}

if(mq_close(mqd_receiver) == (mqd_t)-1)
{
perror(“Receiver: mq_close (receiver)”);
exit(1);
}
if(mq_close(mqd_test) == (mqd_t)-1)
{
perror(“Receiver: mq_close (test)”);
exit(1);
}
printf(“Receiver: Exit\n”);

exit(0);
}

You can compile the above two programs and any other program in this document by giving the following command in a terminal:

$gcc -lrt -o <executable_name> <c source file>

Run pmq_receiver_desc first in a terminal followed by pmq_sender_desc in another terminal.

The observation in Figure 1 and Figure 2 is that the pmq_sender_desc process opened the POSIX message queue mq-postman and got the descriptor as 3. pmq_receiver_desc process opened the same POSIX message queue as pmq_sender_desc process but got the descriptor as 4. This is because pmq_receiver_desc process had opened another POSIX message queue /mq-test, which got descriptor 3 before opening the /mq-postman queue. This shows that message send or receive works, although the descriptor values could possibly be different for different processes that are dealing with the same message queue.

A sender process opening a message queue and printing the descriptor
Figure 1: A sender process opening a message queue and printing the descriptor
A receiver process opening a message queue and printing the descriptor
Figure 2: A receiver process opening a message queue and printing the descriptor

Message queue full

The APIs mq_send() and mq_receive() return EAGAIN if the POSIX message queue is created in non-blocking mode (O_NONBLOCK set in oflag) and there is no message sent or received.

The API mq_timedsend() may block for a pre-set duration, if a queue is full and an attempt to send a message to this already full queue is made. Likewise, the API mq_timedreceive() may block for a pre-set duration, if the queue is empty and an attempt to receive a message from this already empty queue is made.

Single sender and multiple reader access

In an IPC, more than one process/program can open the same named POSIX message queue to write/read. Let us take an example where we have a single writer process and multiple reader processes. A single writer process opens a queue in write mode and the same queue is opened in read mode by more than one reader process. The multiple reader processes are nothing but the same program with multiple copies of its executable.

The code given below demonstrates the behaviour when multiple receiver processes are waiting to receive the messages from the same message queue. A single process sends a series of messages to the message queue.

// pmq_sender_ssmr.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>

#define MQ_QUEUE_PERMISSIONS 0660
#define MQ_MAX_MESSAGES 10
#define MQ_MAX_MSG_SIZE 128
#define MQ_MSG_BUFFER_SIZE MQ_MAX_MSG_SIZE + 10

int main(int argc, char **argv)
{
mqd_t mqd_sender;
struct mq_attr attr;
const char *mq_name = “/mq-postman”;
char tx_buffer[MQ_MSG_BUFFER_SIZE] = {“Hello there - “};

attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MESSAGES;
attr.mq_msgsize = MQ_MAX_MSG_SIZE;
attr.mq_curmsgs = 0;

if((mqd_sender = mq_open(mq_name, O_WRONLY | O_CREAT, MQ_QUEUE_PERMISSIONS, &attr)) == (mqd_t)-1)
{
perror(“Sender: mq_open(sender)”);
exit(1);
}
else
{
printf(“Sender MQ %s opened\n”, mq_name);
}

for(int i=0;i<10;i++)
{
sprintf(&tx_buffer[14], “%d”, i);
printf(“Sender: sending message: [%s]\n”, tx_buffer);
// send message to message queue
if(mq_send(mqd_sender, &tx_buffer[0], strlen (tx_buffer), 0) == (mqd_t)-1)
{
perror(“Sender: Unable to send message”);
}
}

if(mq_close(mqd_sender) == (mqd_t)-1)
{
perror(“Sender: mq_close”);
exit(1);
}

printf(“Sender: Exit\n”);

exit(0);
}

// pmq_receiver_ssmr.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>

#define MQ_QUEUE_PERMISSIONS 0660
#define MQ_MAX_MESSAGES 10
#define MQ_MAX_MSG_SIZE 128
#define MQ_MSG_BUFFER_SIZE MQ_MAX_MSG_SIZE + 10

int main(int argc, char **argv)
{
mqd_t mqd_receiver;
const char *mq_name = “/mq-postman”;
struct mq_attr attr;
char rx_buffer[MQ_MSG_BUFFER_SIZE];

attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MESSAGES;
attr.mq_msgsize = MQ_MAX_MSG_SIZE;
attr.mq_curmsgs = 0;

if((mqd_receiver = mq_open(mq_name, O_RDONLY | O_CREAT, MQ_QUEUE_PERMISSIONS, &attr)) == (mqd_t)-1)
{
perror(“Receiver: mq_open”);
exit(1);
}
else
{
printf(“Receiver MQ %s opened\n”, mq_name);
}
int i=9;
while(i)
{
printf(“Receiver: blocked until message arrives...\n”);
// get the oldest message with highest priority first from message queue
if(mq_receive(mqd_receiver, rx_buffer, MQ_MSG_BUFFER_SIZE, NULL) == (mqd_t)-1)
{
perror (“Receiver: mq_receive”);
exit (1);
}
else
{
printf(“Receiver: message received: [%s]\n”, rx_buffer);
i--;
}
}

if (mq_close(mqd_receiver) == (mqd_t)-1)
{
perror (“Sender: mq_close”);
exit (1);
}

printf(“Receiver: Exit\n”);

exit(0);
}

After compiling pmq_receiver_ssmr.c, make two copies of its executable.

Run the two receivers viz pmq_receiver_ssmr and pmq_receiver_ssmr2 in two different terminals first, followed by pmq_sender_ssmr in the third terminal.

In Figure 3, a single sender process sends a series of messages to the message queue. In Figure 4 and Figure 5, two receiver processes wait to receive messages from the same message queue. We can see that there is no specific order in which the receiver processes the messages sent by the sender.

 A single sender process sending a series of messages to a message queue
Figure 3: A single sender process sending a series of messages to a message queue
Receiver1 processes waiting and receiving messages from a message queue
Figure 4: Receiver1 processes waiting and receiving messages from a message queue
Receiver2 processes waiting and receiving messages from a message queue
Figure 5: Receiver2 processes waiting and receiving messages from a message queue

Setting priority to messages

One of the features of POSIX message queues is the ability to set priority levels for the messages when they are being sent. This helps the receiver process to receive the messages with the highest priority first among the other priorities, irrespective of the order in which they were sent by the sender process.

The code given below demonstrates the behaviour where a receiver processes the highest priority message in the message queue first, followed by the lower priority message in the queue. A sender process sends a series of messages with alternating priorities to the message queue.

// pmq_sender_priority.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>

#define MQ_PERMISSIONS 0660
#define MQ_MAX_MESSAGES 10
#define MQ_MAX_MSG_SIZE 128
#define MQ_MSG_BUFFER_SIZE MQ_MAX_MSG_SIZE + 10

int main(int argc, char **argv)
{
mqd_t mqd_sender;
struct mq_attr attr;
const char *mq_name = “/mq-postman”;
unsigned int msg_priority;
char tx_buffer[MQ_MSG_BUFFER_SIZE] = {“Hello there - “};

attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MESSAGES;
attr.mq_msgsize = MQ_MAX_MSG_SIZE;
attr.mq_curmsgs = 0;

if((mqd_sender = mq_open(mq_name, O_WRONLY | O_CREAT, MQ_PERMISSIONS, &attr)) == (mqd_t)-1)
{
perror(“Sender: mq_open”);
exit(1);
}
else
{
printf(“Sender MQ %s opened\n”, mq_name);
}

for(int i=1;i<=9;i++)
{
if(i%2 == 0)
msg_priority = 0;
else
msg_priority = 10;

sprintf(&tx_buffer[14], “%d”, i);

printf(“Sender: sending message: [%s] with priority: [%d]\n”, tx_buffer, msg_priority);
// send message to message queue
if(mq_send(mqd_sender, &tx_buffer[0], strlen (tx_buffer), msg_priority) == (mqd_t)-1)
{
perror(“Sender: Unable to send message”);
}
}

if(mq_close(mqd_sender) == (mqd_t)-1)
{
perror(“Sender: mq_close”);
exit(1);
}
printf(“Sender: Exit\n”);

exit(0);
}


// pmq_receiver_priority.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>

#define MQ_PERMISSIONS 0660
#define MQ_MAX_MESSAGES 10
#define MQ_MAX_MSG_SIZE 128
#define MQ_MSG_BUFFER_SIZE MQ_MAX_MSG_SIZE + 10

int main(int argc, char **argv)
{
mqd_t mqd_receiver;
const char *mq_name = “/mq-postman”;
struct mq_attr attr;
unsigned int msg_priority;
char rx_buffer[MQ_MSG_BUFFER_SIZE];

attr.mq_flags = 0;
attr.mq_maxmsg = MQ_MAX_MESSAGES;
attr.mq_msgsize = MQ_MAX_MSG_SIZE;
attr.mq_curmsgs = 0;

if((mqd_receiver = mq_open(mq_name, O_RDONLY | O_CREAT, MQ_PERMISSIONS, &attr)) == (mqd_t)-1)
{
perror(“Receiver: mq_open”);
exit(1);
}
else
{
printf(“Receiver MQ %s opened\n”, mq_name);
}

int i=9;
while(i)
{
printf(“Receiver: blocked, waiting for messages...\n”);
// get the highest priority message first
if(mq_receive(mqd_receiver, rx_buffer, MQ_MSG_BUFFER_SIZE, &msg_priority) == (mqd_t)-1)
{
perror(“Sender: mq_receive”);
exit(1);
}
else
{
printf(“Receiver: message received: [%s] with priority: [%d]\n”, rx_buffer, msg_priority);
i--;
}
}

if(mq_close(mqd_receiver) == (mqd_t)-1)
{
perror(“Receiver: mq_close”);
exit(1);
}

printf(“Receiver: Exit\n”);

exit(0);
}

In the above example, run the pmq_receiver_priority first in a terminal, followed by pmq_sender_priority in another terminal.

As demonstrated in Figures 6 and 7, the sender process sends 9 messages to the message queue, with message priority alternating between 0 and 10. The receiver process receives the highest priority (10) messages available in the queue first, and then the lower priority (0) messages.

A sender process sending a series of messages with alternating priority to the message queue
Figure 6: A sender process sending a series of messages with alternating priority to the message queue
A receiver process receiving the highest priority messages first from the message queue
Figure 7: A receiver process receiving the highest priority messages first from the message queue

Deleting message queues

POSIX message queues have kernel persistence. This means a queue stays in kernel space even after the process that created it exits without deleting the queue. A process can delete a message queue by calling mq_unlink() and passing the name of the message queue as the parameter. A user, too, can remove the message queue by giving the command rm /dev/mqueue/<qname>. If a process calls mq_unlink() with the unread messages in the queue, the said POSIX message queue can still get deleted from kernel space.

Pitfalls

One feature that differentiates POSIX message queues from System V message queues is the asynchronous notification when a new message arrives on an empty queue. When using asynchronous notification to receive messages, a process can be notified either by a signal or by invocation of a handler. A process needs to register itself to receive asynchronous notification, so that when a message arrives on an empty queue, a notification is sent to the registered process. By doing so, when a message arrives in an empty queue, the registered process is notified where the message should be read. To get notification on the arrival of another message, the process must re-register and receive asynchronous notification. This needs to be done every time a message arrives in an empty queue.

Suitable care needs to be exercised if a process sends a message in-between the mq_receive() and mq_notify(), as asynchronous notification works only when a message is received on an empty queue. If the queue already has messages and mq_notify() is called, the notification will never happen and, consequently, the messages will never be read out from the queue.

If a fork() is named after mq_notify(), then the child process doesn’t inherit the parent’s message queue notification registration. In the event of exec() or process termination, message queue descriptors associated with this process are closed and the message notification registration on the said queue is de-registered. In either case, the POSIX message queue still remains in the system unless mq_unlink() is called, as explained earlier.

When the design requires that mq_notify() is used, ensure that POSIX message queue is opened in non-blocking mode.

A single POSIX message queue can be used for both sending and receiving a message between two processes. But it gets challenging to find out whether the message in the queue is something sent by the sender to be read by the receiver or is it the (response) message sent by the receiver to be read by the sender. So it is always better to make use of two different message queues, one for sending messages and one for receiving them.

I have demonstrated the advanced concepts or behaviour of the POSIX message queues with supporting source code and a screenshot of the execution console. This article will be helpful to developers/architects who intend to use POSIX message queues beyond the basic level.

LEAVE A REPLY

Please enter your comment!
Please enter your name here