0%

Principles:msg底层原理

msg 简述

消息队列,是消息的链接表,存放在内核中,一个消息队列由一个标识符(即ID)来标识

  • 消息队列的标识符 key 键,它的基本类型是 key_t,使用 ftok 函数可以生成一个 key_t
  • 两个无关的进程,可以通过唯一标识符 key 来找到对应的 msg

共享内存,消息队列,信号量它们三个都是找一个中间介质来进行通信的,就是文件的设备编号和节点,ftok() 就可以通过“文件路径”来获取一个 key_t 键值

1
2
/* 系统IPC键值的格式转换函数 */
key_t ftok(const char * fname, int id);
  • fname:
    • 指定的文件名,这个文件必须是存在的而且可以访问的
    • 只是根据文件 inode 在系统内的唯一性来取一个数值,和文件的权限无关
  • id:
    • 子序号,它是一个8bit的整数,即范围是0~255
    • 可以根据自己的约定,随意设置
  • return:
    • 成功:返回 key_t 键值
    • 出错:返回 “-1”

msg API

1
2
/* 创建或打开消息队列:成功返回队列ID,失败返回-1 */
int msgget(key_t key, int msgflg);
  • key:
    • IPC_PRIVATE - “0”:会建立新的消息队列(只能单进程通信,不能在两个进程之间进行通信)
    • 大于0的32位整数:视参数 msgflg 来确定操作,通常要求此值来源于 ftok 返回的 IPC 键值
  • msgflg:
    • “0”:取消息队列标识符,若不存在则函数会报错
    • IPC_CREAT:如果内核中不存在键值与 key 相等的消息队列,则新建一个消息队列,如果存在这样的消息队列,返回此消息队列的标识符
    • IPC_CREAT | IPC_EXCL:如果内核中不存在键值与 key 相等的消息队列,则新建一个消息队列,如果存在这样的消息队列则报错
  • return:
    • 成功:返回消息队列的标识符
    • 出错:返回 “-1”,错误原因存于 error 中
1
2
/* 添加消息:成功返回0,失败返回-1 */
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
  • msqid:
    • 消息队列标识符
  • msgp:
    • 发送给队列的消息
    • msgp 可以是任何类型的结构体,但第一个字段必须为 long 类型,即表明此发送消息的类型
  • msgsz:
    • 要发送消息的大小(不含消息类型占用的4个字节)
  • msgflg:
    • “0”:当消息队列满时,msgsnd 将会阻塞,直到消息能写进消息队列
    • IPC_NOWAIT:当消息队列已满的时候,msgsnd 函数不等待立即返回
    • MSG_NOERROR:若发送的消息大于 size 字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程
  • return:
    • 成功:返回 “0”
    • 出错:返回 “-1”,错误原因存于 error 中
1
2
/* 读取消息:成功返回消息数据的长度,失败返回-1 */
int msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
  • msqid:
    • 消息队列标识符
  • msgp:
    • 存放消息的结构体
    • 结构体类型要与 msgsnd 函数发送的类型相同
  • msgsz:
    • 要接收消息的大小(不含消息类型占用的4个字节)
  • msgtyp:
    • “0”:接收第一个消息
    • 大于零:接收类型等于 msgtyp 的第一个消息
    • 小于零:接收类型等于或者小于 msgtyp 绝对值的第一个消息
  • msgflg:
    • “0”:阻塞式接收消息,没有该类型的消息 msgrcv 函数一直阻塞等待
    • IPC_NOWAIT:如果没有符合条件的 msg 则立即返回“-1”,此时错误码为 ENOMSG
    • MSG_COPY:内核会将 message 拷贝一份后再拷贝到用户空间,原双向链表中的 message 并不会被 unlink
    • MSG_EXCEPT:返回队列中第一个类型不为 msgtype 的消息
    • MSG_NOERROR:如果队列中满足条件的消息内容大于所请求的 size 字节,则把该消息截断,截断部分将被丢弃
  • return:
    • 成功:返回 “0”
    • 出错:返回 “-1”,错误原因存于 error 中
1
2
/* 控制消息队列:成功返回0,失败返回-1 */
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
  • msqid:
    • 消息队列标识符
  • cmd:
    • IPC_STAT:获得 msgid 的消息队列头数据到 buf 中
    • IPC_SET:设置消息队列的属性,要设置的属性需先存储在 buf 中,可设置的属性包括:
      • msg_perm.uid、msg_perm.gid、msg_perm.mode 以及 msg_qbytes
  • return:
    • 成功:“0”
    • 出错:“-1”,错误原因存于 error 中

msg 使用案例

read.c 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
struct msgbuf{
long mtype; /* message type, must be > 0 */
char mtext[128]; /* message data */
};
int main()
{
struct msgbuf sendbuf={999,"888 message already received"};
struct msgbuf readbuf;
int msgid = 0;
key_t key;
key = ftok(".",'z');//获取键值
printf("key=%x\n",key);
msgid=msgget(key,IPC_CREAT|0777);//在内核中打开或建立键值为key的,权限为0777的消息队列
if(msgid == -1){
printf("create msgq failure\n");
}
msgrcv(msgid,&readbuf,sizeof(readbuf.mtext),888,0);//从队列中获取888类型的数据,如果队列中未出现888类型的数据,则程序阻塞在这里
printf("read from que:%s\n",readbuf.mtext);
msgsnd(msgid,&sendbuf,strlen(sendbuf.mtext),0);//往队列id为msgid的队列写入sendbuf(类型为999)数据
msgctl(msgid,IPC_RMID,NULL);//将队列从系统内核中删除
return 0;
}

send.c 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
struct msgbuf{
long mtype; /* message type, must be > 0 */
char mtext[128]; /* message data */
};
int main()
{
struct msgbuf sendbuf={888,"this is message from que"};
struct msgbuf readbuf;
int msgid= 0;
key_t key;
key = ftok(".",'z');//获取键值
printf("key=%x\n",key);
msgid=msgget(key,IPC_CREAT|0777);//在内核中打开或建立键值为key的,权限为0777的消息队列
if(msgid == -1){
printf("create msgq failure\n");
}
msgsnd(msgid,&sendbuf,strlen(sendbuf.mtext),0);//往队列id为msgid的队列写入sendbuf(类型为888)数据
msgrcv(msgid,&readbuf,sizeof(readbuf.mtext),999,0);//从队列中获取999类型的数据,如果队列中未出现999类型的数据,则程序阻塞在这里
printf("%s\n",readbuf.mtext);
msgctl(msgid,IPC_RMID,NULL);//将队列从系统内核中删除
return 0;
}

结果:

1
2
3
exp ./read 
key=7a05274f /* 获取同一个key */
read from que:this is message from que
1
2
3
exp ./send          
key=7a05274f /* 获取同一个key */
888 message already received
  • 如果只在同一个进程中传递信息,则不提供 key 也可以

Linux 中 msg 的实现

创建或打开消息队列 msgget

1
2
3
0x7ffff7ee3019 <msgget+9>        syscall  <SYS_msgget>
key: 0x7a05274f
msgflg: 0x3ff
  • 用户态的底层就是一个 syscall
  • 内核态中就是以下这个函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct ipc_ops {
int (*getnew)(struct ipc_namespace *, struct ipc_params *);
int (*associate)(struct kern_ipc_perm *, int);
int (*more_checks)(struct kern_ipc_perm *, struct ipc_params *);
};

long ksys_msgget(key_t key, int msgflg)
{
struct ipc_namespace *ns;
static const struct ipc_ops msg_ops = {
.getnew = newque,
.associate = security_msg_queue_associate,
}; /* 初始化"创建例程" */
struct ipc_params msg_params;

ns = current->nsproxy->ipc_ns; /* 获取当前IPC命名空间 */

msg_params.key = key; /* 键值 */
msg_params.flg = msgflg; /* 标识符 */

return ipcget(ns, &msg_ids(ns), &msg_ops, &msg_params); /* 核心函数 */
}
1
2
3
4
5
6
7
8
int ipcget(struct ipc_namespace *ns, struct ipc_ids *ids,
const struct ipc_ops *ops, struct ipc_params *params)
{
if (params->key == IPC_PRIVATE) /* 是否私有 */
return ipcget_new(ns, ids, ops, params); /* 创建一个新的ipc对象 */
else
return ipcget_public(ns, ids, ops, params); /* 获取一个ipc对象或创建一个新对象 */
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int ipcget_new(struct ipc_namespace *ns, struct ipc_ids *ids,
const struct ipc_ops *ops, struct ipc_params *params)
{
// *ns: ipc命名空间
// *ids: ipc标识符集
// *ops: 要调用的实际创建例程
// *params: 它的参数
int err;

down_write(&ids->rwsem); /* 写者申请[得到]读写信号量sem时调用 */
err = ops->getnew(ns, params); /* 其实就是执行了"创建例程"中的newque */
up_write(&ids->rwsem); /* 写者[释放]读写信号量sem时调用 */
return err;
}
  • 前面这些可以说是 “共享内存”,“信号量”,“消息队列” 的通用部分,只是 ipc_ops 结构体的初始化不同
  • 其实这里可以看出一点面向对象的思想了
  • 函数 newque 的源码如下:(创建一个新的消息队列)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static int newque(struct ipc_namespace *ns, struct ipc_params *params)
{
// *ns: 命名空间
// *params: 指向包含key和msgflg的结构体(ipc_params)
struct msg_queue *msq;
int retval;
key_t key = params->key;
int msgflg = params->flg;

msq = kvmalloc(sizeof(*msq), GFP_KERNEL); /* 为msg_queue分配内核堆空间 */
if (unlikely(!msq))
return -ENOMEM;

msq->q_perm.mode = msgflg & S_IRWXUGO;
msq->q_perm.key = key;

msq->q_perm.security = NULL;
retval = security_msg_queue_alloc(&msq->q_perm); /* 将msg_queue添加到消息队列基数树中,并取回基数树id */
if (retval) {
kvfree(msq);
return retval;
}

msq->q_stime = msq->q_rtime = 0;
msq->q_ctime = ktime_get_real_seconds();
msq->q_cbytes = msq->q_qnum = 0;
msq->q_qbytes = ns->msg_ctlmnb;
msq->q_lspid = msq->q_lrpid = NULL;
INIT_LIST_HEAD(&msq->q_messages);
INIT_LIST_HEAD(&msq->q_receivers);
INIT_LIST_HEAD(&msq->q_senders);

/* ipc_addid() locks msq upon success. */
retval = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni); /* 新创建的msg_queue结构挂到msg_ids里面的基数树上 */
if (retval < 0) {
ipc_rcu_putref(&msq->q_perm, msg_rcu_free); /* 释放目标 */
return retval;
}

ipc_unlock_object(&msq->q_perm);
rcu_read_unlock();

return msq->q_perm.id;
}
  • msgget 会在内核堆空间中创建一个 msg_queue 结构体:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct msg_queue {
struct kern_ipc_perm q_perm;
time64_t q_stime; /* last msgsnd time */
time64_t q_rtime; /* last msgrcv time */
time64_t q_ctime; /* last change time */
unsigned long q_cbytes; /* current number of bytes on queue */
unsigned long q_qnum; /* number of messages in queue */
unsigned long q_qbytes; /* max number of bytes on queue */
struct pid *q_lspid; /* pid of last msgsnd */
struct pid *q_lrpid; /* last receive pid */

struct list_head q_messages;
struct list_head q_receivers;
struct list_head q_senders;
} __randomize_layout;
  • msgsndmsgrcv 都依靠另一个重要的结构体 - msg_msg
1
2
3
4
5
6
7
struct msg_msg {
struct list_head m_list;
long m_type;
size_t m_ts; /* message text size */
struct msg_msgseg *next;
void *security; /* the actual message follows immediately */
};

往消息队列中添加消息 msgsnd

1
2
3
4
5
0x7ffff7ee2eb8 <msgsnd+24>    syscall  <SYS_msgsnd>
msqid: 0x16
msgp: 0x7fffffffddd0 ◂— 0x378
msgsz: 0x18
msgflg: 0x0
  • 用户态的底层还是一个 syscall
  • 内核态中就是以下这个函数:
1
2
3
4
5
6
7
8
9
long ksys_msgsnd(int msqid, struct msgbuf __user *msgp, size_t msgsz,
int msgflg)
{
long mtype;

if (get_user(mtype, &msgp->mtype)) /* 从用户空间获取单个数据-msg类型 */
return -EFAULT;
return do_msgsnd(msqid, mtype, msgp->mtext, msgsz, msgflg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
static long do_msgsnd(int msqid, long mtype, void __user *mtext,
size_t msgsz, int msgflg)
{
struct msg_queue *msq;
struct msg_msg *msg;
int err;
struct ipc_namespace *ns;
DEFINE_WAKE_Q(wake_q);

ns = current->nsproxy->ipc_ns;

if (msgsz > ns->msg_ctlmax || (long) msgsz < 0 || msqid < 0)
return -EINVAL;
if (mtype < 1)
return -EINVAL;

msg = load_msg(mtext, msgsz); /* 先调用"alloc_msg(msgsz)"创建一个msg_msg结构体,然后调用"copy_from_user(msg+1,mtext,msgsz)"拷贝用户空间的mtext紧跟msg_msg结构体的后面 */
if (IS_ERR(msg))
return PTR_ERR(msg);

msg->m_type = mtype; /* 写入msg类型 */
msg->m_ts = msgsz; /* 写入msg大小 */

rcu_read_lock();
msq = msq_obtain_object_check(ns, msqid); /* 通过msqid从namespace中找到对应的msq->q_perm结构体,然后调用container_of通过偏移计算得到msg_queue结构体地址 */
if (IS_ERR(msq)) {
err = PTR_ERR(msq);
goto out_unlock1;
}

ipc_lock_object(&msq->q_perm);

for (;;) {
struct msg_sender s; /* 定义了发送消息链表 */

err = -EACCES;
if (ipcperms(ns, &msq->q_perm, S_IWUGO))
goto out_unlock0;

/* raced with RMID? */
if (!ipc_valid_object(&msq->q_perm)) { /* 检查该队列是否被删除 */
err = -EIDRM;
goto out_unlock0;
}

err = security_msg_queue_msgsnd(&msq->q_perm, msg, msgflg); /* 调用一个钩子函数 */
if (err)
goto out_unlock0;
if (msg_fits_inqueue(msq, msgsz)) /* 检查消息队列是否满 */
break;

/* queue full, wait: */
if (msgflg & IPC_NOWAIT) { /* IPC_NOWAIT:当消息队列已满的时候,msgsnd函数不等待立即返回 */
err = -EAGAIN;
goto out_unlock0;
}

/* enqueue the sender and prepare to block */
ss_add(msq, &s, msgsz);

if (!ipc_rcu_getref(&msq->q_perm)) {
err = -EIDRM;
goto out_unlock0;
}

ipc_unlock_object(&msq->q_perm);
rcu_read_unlock();
schedule();
rcu_read_lock();
ipc_lock_object(&msq->q_perm);

ipc_rcu_putref(&msq->q_perm, msg_rcu_free);
/* raced with RMID? */
if (!ipc_valid_object(&msq->q_perm)) { /* 检查该队列是否被删除 */
err = -EIDRM;
goto out_unlock0;
}
ss_del(&s);
if (signal_pending(current)) { /* 仅仅检查一下是否有信号,不处理信号 */
err = -ERESTARTNOHAND;
goto out_unlock0;
}
}

ipc_update_pid(&msq->q_lspid, task_tgid(current));
msq->q_stime = ktime_get_real_seconds();

/* 如果有被阻塞的接收进程,且消息满足接收要求,则将消息直接发送给被阻塞的接收进程
否则,将消息排入消息队列尾 */
if (!pipelined_send(msq, msg, &wake_q)) {
list_add_tail(&msg->m_list, &msq->q_messages); /* 插入msg_msg链表的尾部 */
msq->q_cbytes += msgsz;
msq->q_qnum++;
atomic_add(msgsz, &ns->msg_bytes);
atomic_inc(&ns->msg_hdrs);
}

err = 0;
msg = NULL;

out_unlock0:
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
if (msg != NULL)
free_msg(msg);
return err;
}
  • 这里总结一下 do_msgsnd 的功能:
    • 调用 load_msg 创建一个 msg_msg 结构体,把数据拷贝到该 msg_msg 的后面
    • 通过 msqid 计算得到 msg_queue 结构体地址
    • 检查 msg_queue 后,对将要被发送的数据进行处理:
      • 如果有被阻塞的接收进程,则将消息直接发送给被阻塞的接收进程
      • 否则,将消息排入消息队列尾

从消息队列中读取消息 msgrcv

1
2
3
4
5
6
0x7ffff7ee2f68 <msgrcv+24>    syscall  <SYS_msgrcv>
msqid: 0x16
msgp: 0x7fffffffde60 ◂— 0x0
msgsz: 0x80
msgtyp: 0x3e7
msgflg: 0x0
  • 用户态的底层还是一个 syscall
  • 内核态中就是以下这个函数:
1
2
3
4
5
long ksys_msgrcv(int msqid, struct msgbuf __user *msgp, size_t msgsz,
long msgtyp, int msgflg)
{
return do_msgrcv(msqid, msgp, msgsz, msgtyp, msgflg, do_msg_fill);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
static long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgflg,
long (*msg_handler)(void __user *, struct msg_msg *, size_t))
{
int mode;
struct msg_queue *msq;
struct ipc_namespace *ns;
struct msg_msg *msg, *copy = NULL;
DEFINE_WAKE_Q(wake_q);

ns = current->nsproxy->ipc_ns;

if (msqid < 0 || (long) bufsz < 0)
return -EINVAL;

if (msgflg & MSG_COPY) {
if ((msgflg & MSG_EXCEPT) || !(msgflg & IPC_NOWAIT))
return -EINVAL;
copy = prepare_copy(buf, min_t(size_t, bufsz, ns->msg_ctlmax)); /* 调用"load_msg(buf, bufsz)",生成msg_msg为copy做准备 */
if (IS_ERR(copy))
return PTR_ERR(copy);
}
mode = convert_mode(&msgtyp, msgflg);

rcu_read_lock();
msq = msq_obtain_object_check(ns, msqid); /* 通过msqid从namespace中找到对应的msq->q_perm结构体,然后调用container_of通过偏移计算得到msg_queue结构体地址 */
if (IS_ERR(msq)) {
rcu_read_unlock();
free_copy(copy);
return PTR_ERR(msq);
}

for (;;) {
struct msg_receiver msr_d; /* 定义了接收消息链表 */

msg = ERR_PTR(-EACCES);
if (ipcperms(ns, &msq->q_perm, S_IRUGO))
goto out_unlock1;

ipc_lock_object(&msq->q_perm);

/* raced with RMID? */
if (!ipc_valid_object(&msq->q_perm)) {
msg = ERR_PTR(-EIDRM);
goto out_unlock0;
}

msg = find_msg(msq, &msgtyp, mode); /* 查找可用的msg */
if (!IS_ERR(msg)) {
/*
* Found a suitable message.
* Unlink it from the queue.
*/
if ((bufsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {
msg = ERR_PTR(-E2BIG);
goto out_unlock0;
}
/*
* If we are copying, then do not unlink message and do
* not update queue parameters.
*/
if (msgflg & MSG_COPY) {
msg = copy_msg(msg, copy); /* MSG_COPY:将message拷贝一份后再拷贝到用户空间 */
goto out_unlock0;
}

list_del(&msg->m_list); /* 把已经接收过数据的msg脱链 */
msq->q_qnum--;
msq->q_rtime = ktime_get_real_seconds();
ipc_update_pid(&msq->q_lrpid, task_tgid(current));
msq->q_cbytes -= msg->m_ts;
atomic_sub(msg->m_ts, &ns->msg_bytes);
atomic_dec(&ns->msg_hdrs);
ss_wakeup(msq, &wake_q, false);

goto out_unlock0;
}

/* No message waiting. Wait for a message */
if (msgflg & IPC_NOWAIT) { /* IPC_NOWAIT:如果没有符合条件的msg则立即返回 */
msg = ERR_PTR(-ENOMSG);
goto out_unlock0;
}

list_add_tail(&msr_d.r_list, &msq->q_receivers); /* 插入msg_msg链表尾 */
msr_d.r_tsk = current;
msr_d.r_msgtype = msgtyp;
msr_d.r_mode = mode;
if (msgflg & MSG_NOERROR)
msr_d.r_maxsize = INT_MAX;
else
msr_d.r_maxsize = bufsz;
msr_d.r_msg = ERR_PTR(-EAGAIN);
__set_current_state(TASK_INTERRUPTIBLE);

ipc_unlock_object(&msq->q_perm);
rcu_read_unlock();
schedule();
rcu_read_lock();

msg = READ_ONCE(msr_d.r_msg); /* 读出msg变量 */
if (msg != ERR_PTR(-EAGAIN))
goto out_unlock1;

ipc_lock_object(&msq->q_perm);

msg = msr_d.r_msg;
if (msg != ERR_PTR(-EAGAIN))
goto out_unlock0;

list_del(&msr_d.r_list);
if (signal_pending(current)) { /* 仅仅检查一下是否有信号,不处理信号 */
msg = ERR_PTR(-ERESTARTNOHAND);
goto out_unlock0;
}

ipc_unlock_object(&msq->q_perm);
}

out_unlock0:
ipc_unlock_object(&msq->q_perm);
wake_up_q(&wake_q);
out_unlock1:
rcu_read_unlock();
if (IS_ERR(msg)) {
free_copy(copy);
return PTR_ERR(msg);
}

bufsz = msg_handler(buf, msg, bufsz); /* 这里的msg_handler就是do_msg_fill */
free_msg(msg);

return bufsz;
}
  • 这里总结一下 do_msgrcv 的功能:
    • 调用 prepare_copy 为后面的复制做准备(在底层还是调用 load_msg 创建一个 msg_msg 结构体,再把数据拷贝到该 msg_msg 的后面)
    • 通过 msqid 计算得到 msg_queue 结构体地址
    • 调用 find_msg 查找可用的 msg_msg 结构体(这些 msg_msg 都是 msgsnd 发送出来的)
    • 调用 do_msg_fill->store_msg->copy_to_user 把 msg_msg 中的内容传输到用户态

msg VS Pipe

同样是进程间的通信,那么消息队列与管道相较而言有哪些优势和劣势:

  • 优点:
    • 消息队列收发消息自动保证了同步,不需要由进程自己来提供同步方法,而命名管道需要自行处理同步问题
    • 消息队列接收数据可以根据消息类型有选择的接收特定类型的数据,不需要像命名管道一样默认接收数据
  • 缺点:
    • 发送和接受的每个数据都有最大的长度限制