Linux环境提供了XSI和POSIX两套消息队列,本文分开讨论这两种消息队列。

XSI 消息队列(msg)

概念

  • 消息队列传递的数据具有某种结构,而不是简单的字节流。
  • 消息队列的本质其实是一个内核提供的链表,内核基于这个链表,实现了一个数据结构
  • 向消息队列中写数据,实际上是向这个数据结构中插入一个新结点;从消息队列汇总读数据,实际上是从这个数据结构中删除一个结点
  • 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
  • 消息队列也有管道一样的不足,就是每个数据块的最大长度是有上限的,系统上全体队列的最大总长度也有一个上限

工作原理

image.png

相关函数

XSI (X/Open System Interface)消息队列是由内核提供的功能,有一些内核函数可用于操作 XSI 消息队列

1
2
3
4
5
6
7
#include <sys/msg.h>
int msgget(key_t key, int msgflg); // 创建或访问一个消息队列
int msgsnd(int msqid, const void msgp[.msgsz], size_t msgsz,
int msgflg)
; // 向队列尾部添加消息

ssize_t msgrcv(int msqid, void msgp[.msgsz], size_t msgsz, long msgtyp,
int msgflg); // 从队列中取出一条消息,和 fifo 不同的是,可以指定消息
int msgctl(int msqid, int cmd, struct msqid_ds *buf); // 控制消息队列 读取、修改、删除等

示例代码

发送方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <stdio.h>
#include <sys/msg.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <string>

const static int TEST_MSG_KEY = 1234;
const static int TEST_MSG_TYPE = 1;

struct my_msg_st {
long int mtype;
char mtext[BUFSIZ];
};

int main() {
printf("start sender\n");
// 创建消息队列
int qid = msgget(TEST_MSG_KEY, IPC_CREAT | 0666);
// 创建消息
struct my_msg_st msg;
msg.mtype = TEST_MSG_TYPE;
while (true)
{
time_t t;
time(&t);
snprintf(msg.mtext, sizeof(msg.mtext), "a message at %s", ctime(&t));
// 发送消息
// if (msgsnd(qid, &msg, sizeof(msg.mtext), IPC_NOWAIT) == -1) // 非阻塞,无接受者时返回错误码
if (msgsnd(qid, &msg, sizeof(msg.mtext), 0) == -1) // 阻塞,无接收者时阻塞等待
{
perror("msgsnd error");
exit(EXIT_FAILURE);
}
printf("sent: %s\n", msg.mtext);
sleep(1);
}
return 0;
}

接收方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <errno.h>
#include <unistd.h>

const static int TEST_MSG_KEY = 1234;
const static int TEST_MSG_TYPE = 1;

struct my_msg_st {
long int mtype;
char mtext[BUFSIZ];
};

int main() {
printf("start reciver\n");
// 获取消息队列
int qid = msgget(TEST_MSG_KEY, IPC_CREAT | 0666);
// 读取消息
struct my_msg_st msg;
while (true)
{
if (msgrcv(qid, &msg, sizeof(msg.mtext), TEST_MSG_TYPE, MSG_NOERROR | IPC_NOWAIT ) == -1) {
if (errno != ENOMSG) {
perror("msgrcv");
exit(EXIT_FAILURE);
}
printf("No message available for msgrcv()\n");
} else {
printf("message received: %s\n", msg.mtext);
}
sleep(1);
}
return 0;
}

image.png

POSIX 消息队列(mqueue)

POSIX消息队列是独立于XSI消息队列的一套新的消息队列API,让进程可以用消息的方式进行数据交换。这套消息队列在Linux 2.6.6版本之后开始支持,还需要你的glibc版本必须高于2.3.4。

和 XSI 消息队列的联系和区别

XSI 消息队列和POSIX消息队列都是在Linux环境中实现进程间通信(IPC)的方法。它们都允许进程通过发送和接收消息来交换数据和同步操作。它们之间存在一些关键区别:

  • 标准:XSI 消息队列遵循X/Open系统接口(XSI)标准,而POSIX消息队列遵循可移植操作系统接口(POSIX)标准。

  • API:这两种消息队列使用不同的API集。System V 消息队列使用System V IPC API(如msgget、msgsnd、msgrcv等),而POSIX消息队列使用POSIX IPC API(如mq_open、mq_send、mq_receive等)。

  • 兼容性:POSIX消息队列在不同的Unix-like系统上具有更好的可移植性,因为它们遵循POSIX标准。而XSI消息队列可能在不同的系统上表现不一致,因为它们遵循较早的XSI标准。

  • 功能:在某些方面,POSIX消息队列提供了比XSI消息队列更丰富的功能。例如,POSIX消息队列支持优先级消息和非阻塞操作,而XSI消息队列不支持这些特性。

  • 资源限制:XSI消息队列和POSIX消息队列在资源限制方面有所不同。例如,XSI消息队列的大小受系统参数限制,而POSIX消息队列的大小可以在创建时指定。

相关函数

1
2
3
4
5
6
7
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,
struct mq_attr *attr);
  • 类似对文件的open,我们可以用mq_open来打开一个已经创建的消息队列或者创建一个消息队列。这个函数返回一个叫做mqd_t类型的返回值,其本质上还是一个文件描述符,只是在这这里被叫做消息队列描述符(message queue descriptor),在进程里使用这个描述符对消息队列进程操作。

  • 所有被创建出来的消息队列在系统中都有一个文件与之对应,这个文件名是通过name参数指定的,这里需要注意的是:name必须是一个以”/”开头的字符串,比如我想让消息队列的名字叫”message”,那么name应该给的是”/message”。消息队列创建完毕后,会在/dev/mqueue目录下产生一个以name命名的文件,我们还可以通过cat这个文件来看这个消息队列的一些状态信息。其它进程在消息队列已经存在的情况下就可以通过mp_open打开名为name的消息队列来访问它。

  • 可以使用多路复用(epoll / poll / select) 来操作这个 fd

1
2
3
4
5
6
mq_close(3)    // 关闭 消息队列fd
mq_getattr(3) // 获取属性
mq_notify(3) // 异步消息通知
mq_receive(3) // 读取消息
mq_send(3) // 发送消息
mq_unlink(3) // 删除消息队列文件

示例代码

发送方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <mqueue.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <unistd.h>

#define MQNAME "/mqtest"

int main() {
printf("start sender\n");
// 创建消息队列文件
mqd_t mqd = mq_open(MQNAME, O_RDWR|O_CREAT, 0600, 0);
if (mqd == -1) {
perror("mq_open()");
exit(1);
}
int ret;
char msg[BUFSIZ];
while (true)
{
time_t t;
time(&t);
snprintf(msg, sizeof(msg), "a message at %s", ctime(&t));
ret = mq_send(mqd, msg, strlen(msg), NULL);
if (ret == -1) {
perror("mq_send()");
exit(1);
}
printf("sent msg: %s\n", msg);
sleep(1);
}

return 0;
}

可以先发送个几秒钟消息,通过 cat /dev/mqueue/mqtest 查看消息队列状态

接收方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <stdio.h>
#include <mqueue.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <unistd.h>

#define MQNAME "/mqtest"

int main() {
printf("start reciver\n");
// 打开消息队列文件
mqd_t mqd = mq_open(MQNAME, O_RDWR);
if (mqd == -1) {
perror("mq_open()");
exit(1);
}
int ret;
unsigned int prio;
char msg[BUFSIZ];
while (true)
{
ret = mq_receive(mqd, msg, BUFSIZ, &prio);
if (ret == -1) {
perror("mq_receive()");
exit(1);
}
printf("recive msg: %s,%d\n", msg, prio);
sleep(1);
}

return 0;
}

参考文档

☞ 参与评论