0%

Principles:Pipe底层原理

Pipe 简述

进程用户空间是相互独立的,一般而言是不能相互访问的,但很多情况下进程间需要互相通信,来完成系统的某项功能,进程通过与内核及其它进程之间的互相通信来协调它们的行为,管道就是作为进程间的一种通信方式

  • 内核申请一块缓存区,这个缓存区留有两个接口,分别接在两个不同的进程上
  • 这个缓冲区不需要很大,它被设计成为环形的数据结构,以便可以被循环利用:
    • 当管道中没有信息的话,从管道中读取的进程会等待,直到另一端的进程放入信息
    • 当管道被放满信息的时候,尝试放入信息的进程会等待,直到另一端的进程取出信息
    • 当两个进程都终结的时候,管道也自动消失

管道分为无名管道(pipe)和有名管道(FIFO)两种:

  • 无名管道:只能用于 公共祖先 的两个进程间的通信,原因是自己创建的管道在别的进程中并不可见
  • 有名管道:可用于同一系统中的任意两个进程间的通信

Pipe 案例

父子进程通信(无名管道):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>

/* 父子共享文件描述符 */

int main()
{
int fd[2];
pid_t pid;

int ret = pipe(fd);
if(ret==-1){
perror("pipe error:");
exit(1);
}
pid = fork();
if(pid==-1){
perror("fork error:");
exit(1);
}
else if(pid==0){ /* 子进程 读数据,关闭写端fd[1] */
close(fd[1]);
char buf[1024];

int rea = read(fd[0],buf,sizeof(buf));
if(rea==0){
perror("read finish\n");
}
/* 将读出的数据写到屏幕上 */
write(1, buf, rea);
close(fd[0]);
}
else{ /* 父进程写书据,关闭读端fd[0] */
close(fd[0]);
write(fd[1], "hello pipe\n", strlen("hello pipe\n"));
close(fd[1]);
}

return 0;
}
  • 父进程向管道中写入数据
  • 儿进程从管道中读取数据
  • 结果:
1
2
➜  exp ./pipe1
hello pipe

兄弟进程通信(无名管道):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include <unistd.h>
#include <sys/wait.h>

int main(int argc, char *argv[])
{
int fd[2];
int ret = pipe(fd);
int i = 0;

if (ret == -1) {
perror("[pipe create file] ");
return 0;
}

for ( ; i < 2; i++) {
pid_t pid = fork();

if (pid == 0)
break;
if (pid == -1)
perror("[creator process file:]");
}

if (i == 0) { // child1
dup2(fd[1], 1); /* 设置child1的标准输出为pipe */
close(fd[0]);
execlp("ls", "ls", NULL);
} else if (i == 1) { // child2
dup2(fd[0], 0); /* 设置child2的标准输入为pipe */
close(fd[1]);
execlp("grep", "grep", "pipe", NULL);
} else if (i == 2) { // parent
close(fd[1]);
close(fd[0]);
int wpid;
while ( wpid = waitpid(-1, NULL, WNOHANG) != -1) { /* 回收子进程 */
/* 当waitpid调用次数过多时,也会返回'-1' */
if (wpid == 1) /* PID为'1'的是init进程(显然不可能死亡) */
continue;
if (wpid == 0) /* 如果设置了选项WNOHANG,而调用中waitpid发现没有已退出的子进程可收集,则返回0 */
continue;
printf("child dide pid = %d\n", wpid);
}
}
printf("pipeWrite = %d, pipeRead = %d\n", fd[1], fd[0]);
return 0;
}
  • 设置 child1 的标准输出为 pipe,执行 execlp("ls", "ls", NULL)
  • 设置 child2 的标准输入为 pipe,执行 execlp("grep", "grep", "pipe", NULL)
  • 先把 child1 的结果输出到 pipe 中,再把 pipe 中的数据输入到 child2
  • 结果:
1
2
3
4
5
6
7
8
9
10
11
➜  exp ./pipe2
pipe1
pipe1.c
pipe2
pipe2.c
pipeWrite = 4, pipeRead = 3
➜ exp ls | grep pipe
pipe1
pipe1.c
pipe2
pipe2.c
  • PS:waitpid() 函数详解
  • 调用了 waitpid(),父类就立即阻塞自己,由 waitpid() 自动分析是否当前进程的某个子进程是否已经退出:
    • 如果让它找到了这样一个已经变成僵尸的子进程,waitpid() 就会收集这个子进程的信息,并把它彻底销毁后返回
    • 如果没有找到这样一个子进程,waitpid() 就会一直阻塞在这里,直到有一个出现为止
1
pid_t waitpid(pid_t pid, int *status, int options)
  • pid>0 时:只等待进程ID等于pid的子进程,不管其它已经有多少子进程运行结束退出了,只要指定的子进程还没有结束,waitpid 就会一直等下去
  • pid=-1 时:等待任何一个子进程退出,没有任何限制,此时 waitpid() 和 wait() 的作用一模一样
  • pid=0 时:等待同一个进程组中的任何子进程,如果子进程已经加入了别的进程组,waitpid() 不会对它做任何理睬
  • pid<-1 时:等待一个指定进程组中的任何子进程,这个进程组的ID等于pid的绝对值
  • 无论哪种情况,WNOHANG 模式均不予等待(把上述案例中的 WNOHANG 去掉就很好验证)

两个无关进程通信(有名管道):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>

#define PATHNAME1 "./fifo1"
#define PATHNAME2 "./fifo2"

void print_err(char* str)
{
perror(str);
exit(-1);
}

char buf[1024] = {0};

int main()
{
int fd[2] = { 0 };
pid_t pid = fork();

if(pid>0) /* 父进程,负责接收数据 */
{
fd[0] = open(PATHNAME1,O_RDWR);
if(fd[0] == -1){
print_err("open fail:");
}
while(1)
{
read(fd[0],buf,sizeof(buf));
printf("rcv:%s",buf);
bzero(buf,sizeof(buf));
}
}
else if(pid == 0) /* 子进程,负责传输数据 */
{
fd[1] = open(PATHNAME2,O_RDWR);
if(fd[1] == -1){
print_err("open fail:");
}
while(1)
{
read(0,buf,sizeof(buf));
write(fd[1],buf,sizeof(buf));
bzero(buf,sizeof(buf));
}
}
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>

#define PATHNAME1 "./fifo1"
#define PATHNAME2 "./fifo2"

void print_err(char* str)
{
perror(str);
exit(-1);
}

char buf[1024] = { 0 };

int main()
{
int fd[2] = { 0 };
int ret[2] = { 0 };

ret[0] = mkfifo(PATHNAME1,0664); /* 创建一个有名管道-fifo1 */
if(ret[0] == -1 && errno!=EEXIST){
print_err("mkfifo fail:");
}
ret[1] = mkfifo(PATHNAME2,0664); /* 创建一个有名管道-fifo2 */
if(ret[1] == -1 && errno!=EEXIST){
print_err("mkfifo fail:");
}

pid_t pid = fork();

if(pid>0) /* 父进程,负责传输数据 */
{
fd[0] = open(PATHNAME1,O_RDWR);
if(fd[0] == -1){
print_err("open fail:");
}
while(1)
{
read(0,buf,sizeof(buf));
write(fd[0],buf,sizeof(buf));
bzero(buf,sizeof(buf));
}
}

if(pid == 0) /* 子进程,负责接收数据 */
{
fd[1] = open(PATHNAME2,O_RDWR);
if(fd[1] == -1){
print_err("open fail:");
}
while(1)
{
read(fd[1],buf,sizeof(buf));
printf("rcv data:%s",buf);
bzero(buf,sizeof(buf));
}
}
return 0;
}
  • 两次调用 mkfifo 创建一对“管道文件”(fifo1-负责写,fifo2-负责读)
  • 通过 open 这两个“管道文件”把两个进程联系起来
  • 结果:
1
2
3
4
5
6
7
➜  exp ./pipe3a                 
123 # 输入1
rcv:abc # 接收2

➜ exp ./pipe3b
rcv data:123 # 接收1
abc # 输入2

无名 Pipe 的实现

在 Linux 中,管道的实现借助了文件系统的 file 结构和 VFS 的索引节点 inode

  • 通过将两个 file 结构指向同一个临时的 VFS 索引节点
  • 而这个 VFS 索引节点又指向一块物理空间而实现的

管道描述符 pipe_inode_info,用于表示一个管道,存储管道相应的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct pipe_inode_info {
struct mutex mutex; /* 互斥锁 */
wait_queue_head_t wait; /* 对于空/满管道的读/写等待点 */
unsigned int nrbufs, curbuf, buffers; /* 非空管道缓冲区的数量/当前管道缓冲区条目/缓冲区总数 */
unsigned int readers; /* 该管道的当前读者数量(每次以读方式打开时,readers加1,关闭时readers减1) */
unsigned int writers; /* 该管道的当前写者数量(每次以写方式打开时,writers加1,关闭时writers减1) */
unsigned int files; /* 引用此管道的file结构体数量 */
unsigned int waiting_writers; /* 被阻塞的管道写者数量 */
unsigned int r_counter; /* 管道读者记数器,每次以读方式打开管道时,r_counter加1,关闭是不变 */
unsigned int w_counter; /* 管道写者计数器,每次以写方式打开管道时,w_counter加1,关闭是不变 */
struct page *tmp_page; /* 页缓存,可以加速页帧的分配过程,当释放页帧时将页帧记入tmp_page,当分配页帧时,优先从tmp_page中获取(如果tmp_page为空才从伙伴系统中获取) */
struct fasync_struct *fasync_readers; /* 读端异步描述符 */
struct fasync_struct *fasync_writers; /* 写端异步描述符 */
struct pipe_buffer *bufs; /* 回环缓冲区(由16个pipe_buffer对象组成,每个pipe_buffer对象拥有一个内存页) */
struct user_struct *user; /* 创建此管道的用户 */
};

创建管道:

1
2
3
/* 用户态封装 */
int pipe(int pipefd[2]);
int pipe2(int pipefd[2], int flags);
1
2
3
4
5
6
7
8
9
10
/* 内核态入口 */
SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags)
{
return do_pipe2(fildes, flags); /* O_NONBLOCK:非阻塞,O_CLOEXEC:fork和exec时是否关闭 */
}

SYSCALL_DEFINE1(pipe, int __user *, fildes) /* pipe()系统调用 */
{
return do_pipe2(fildes, 0);
}
  • 函数 do_pipe2:核心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
* sys_pipe() is the normal C calling standard for creating
* a pipe. It's not the way Unix traditionally does this, though.
*/
static int do_pipe2(int __user *fildes, int flags)
{
struct file *files[2];
int fd[2];
int error;

error = __do_pipe_flags(fd, files, flags); /* 分配两个struct file数据结构,一个用来读,一个用来写 */
if (!error) {
if (unlikely(copy_to_user(fildes, fd, sizeof(fd)))) { /* 调用copy_to_user将两个fd拷贝至用户态 */
fput(files[0]); /* 将两个files归还 */
fput(files[1]);
put_unused_fd(fd[0]); /* 将两个fd归还 */
put_unused_fd(fd[1]);
error = -EFAULT;
} else {
fd_install(fd[0], files[0]); /* 对应fd下标的指针赋值为file */
fd_install(fd[1], files[1]);
}
}
return error;
}
  • 函数 __do_pipe_flags:创建两个 file 结构,并获取其文件描述符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static int __do_pipe_flags(int *fd, struct file **files, int flags)
{
int error;
int fdw, fdr;
/* 首先检查flag标志位 */
if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT))
return -EINVAL;
error = create_pipe_files(files, flags); /* 创建两个file结构 */
if (error)
return error;
error = get_unused_fd_flags(flags); /* 获取文件描述符fdr */
if (error < 0)
goto err_read_pipe;
fdr = error; /* 赋值为fdr */
error = get_unused_fd_flags(flags); /* 获取文件描述符fdw */
if (error < 0)
goto err_fdr;
fdw = error; /* 赋值为fdw */

audit_fd_pair(fdr, fdw);
fd[0] = fdr; /* read-0 */
fd[1] = fdw; /* write-1 */
return 0;

err_fdr:
put_unused_fd(fdr);
err_read_pipe:
fput(files[0]);
fput(files[1]);
return error;
}
  • 函数 create_pipe_files:创建两个 file 结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int create_pipe_files(struct file **res, int flags)
{
struct inode *inode = get_pipe_inode(); /* 首先为管道分配一个inode */
struct file *f;

if (!inode)
return -ENFILE;

/* 分配一个file write */
f = alloc_file_pseudo(inode, pipe_mnt, "",
O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT)),
&pipefifo_fops);
if (IS_ERR(f)) {
free_pipe_info(inode->i_pipe);
iput(inode);
return PTR_ERR(f);
}

f->private_data = inode->i_pipe; /* 设置file的私有数据为inode pipe */

/* 分配一个file read(其实就是直接复制file write) */
res[0] = alloc_file_clone(f, O_RDONLY | (flags & O_NONBLOCK),
&pipefifo_fops);
if (IS_ERR(res[0])) {
put_pipe_info(inode, inode->i_pipe);
fput(f);
return PTR_ERR(res[0]);
}

res[0]->private_data = inode->i_pipe; /* 设置file的私有数据为 inode pipe */
res[1] = f;
return 0;
}
  • 因为 file read 直接复制了 file write,所以它们的底层使用同一个 inode,指向同一片内存地址

打开管道:

1
2
/* 用户态封装 */
FILE *fdopen(int fildes, const char *mode);
1
2
3
4
5
6
7
/* 内核态入口 */
SYSCALL_DEFINE3(open, const char __user *, filename, int, flags, umode_t, mode)
{
if (force_o_largefile()) /* x86_64 恒定为 true */
flags |= O_LARGEFILE;
return do_sys_open(AT_FDCWD, filename, flags, mode); /* 其实底层还是open */
}

有名 Pipe 的实现

FIFO (First in First out) 为一种特殊的文件类型,它在文件系统中有对应的路径,当一个进程以读的方式打开该文件,而另一个进程以写的方式打开该文件,那么内核就会在这两个进程之间建立管道,所以 FIFO 实际上也由内核管理,不与硬盘打交道

  • 之所以叫 FIFO,是因为管道本质上是一个先进先出的队列数据结构,最早放入的数据被最先读出来,从而保证信息交流的顺序

FIFO 只是借用了文件系统来为管道命名(File System,命名管道是一种特殊类型的文件,因为 Linux 中所有事物都是文件,它在文件系统中以文件名的形式存在)

  • 写模式的进程向 FIFO 文件中写入
  • 读模式的进程从 FIFO 文件中读出
  • 当删除 FIFO 文件时,管道连接也随之消失

FIFO 的好处在于我们可以通过文件的路径来识别管道,从而让没有亲缘关系的进程之间建立连接

创建管道:

1
2
int mknod(const char * pathname , mode_t mode , dev_t dev);
int mkfifo(const char * pathname , mode_t mode);
  • 有名管道在底层的实现跟无名管道完全一致,区别只是命名管道会有一个全局可见的文件名以供别人 open() 打开使用
  • 创建完之后,其他进程就可以使用 open() read() write() 标准文件操作等方法进行使用了
  • 无论有名还是无名管道,它的文件描述都没有偏移量的概念,所以不能用 lseek 进行偏移量调整
  • 不管是 mknod 还是 mkfifo,底层都是这个系统调用:
1
2
3
4
0x7ffff7ecd900 <__xmknod+32>    syscall  <SYS_mknod>
path: 0x555555556019 ◂— 0x326f6669662f2e
mode: 0x11b4
dev: 0x0
  • 在内核中对应的函数为:
1
2
3
4
5
static inline long ksys_mknod(const char __user *filename, umode_t mode,
unsigned int dev)
{
return do_mknodat(AT_FDCWD, filename, mode, dev);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
long do_mknodat(int dfd, const char __user *filename, umode_t mode,
unsigned int dev)
{
struct dentry *dentry;
struct path path;
int error;
unsigned int lookup_flags = 0;

error = may_mknod(mode);
if (error)
return error;
retry:
dentry = user_path_create(dfd, filename, &path, lookup_flags); /* 查找路径中的最后一个项的父目录项 */
if (IS_ERR(dentry))
return PTR_ERR(dentry);

if (!IS_POSIXACL(path.dentry->d_inode))
mode &= ~current_umask();
error = security_path_mknod(&path, dentry, mode, dev);
if (error)
goto out;
switch (mode & S_IFMT) {
case 0: case S_IFREG: /* 普通文件(默认) */
error = vfs_create(path.dentry->d_inode,dentry,mode,true); /* 创建普通文件 */
if (!error)
ima_post_path_mknod(dentry);
break;
case S_IFCHR: case S_IFBLK: /* 字符设备文件/块设备文件 */
error = vfs_mknod(path.dentry->d_inode,dentry,mode,
new_decode_dev(dev)); /* 创建特殊文件(FIFO,插口,字符设备文件,块设备文件),new_decode_dev用于创建设备号 */
break;
case S_IFIFO: case S_IFSOCK: /* 有名管道FIFO/套接字 */
error = vfs_mknod(path.dentry->d_inode,dentry,mode,0);
break;
}
out:
done_path_create(&path, dentry); /* 减少path和dentry的计数 */
if (retry_estale(error, lookup_flags)) {
lookup_flags |= LOOKUP_REVAL;
goto retry;
}
return error;
}
  • 我们的目的是创建有名管道 FIFO,所以会调用 vfs_mknod:(创建 inode 的公共流程函数)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int vfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode, dev_t dev)
{
int error = may_create(dir, dentry);

if (error)
return error;

if ((S_ISCHR(mode) || S_ISBLK(mode)) && !capable(CAP_MKNOD))
return -EPERM;

if (!dir->i_op->mknod) /* 保证"dir->i_op->mknod"不为空 */
return -EPERM;

error = devcgroup_inode_mknod(mode, dev);
if (error)
return error;

error = security_inode_mknod(dir, dentry, mode, dev);
if (error)
return error;

error = dir->i_op->mknod(dir, dentry, mode, dev); /* 根据文件系统决定 */
if (!error)
fsnotify_create(dir, dentry);
return error;
}
EXPORT_SYMBOL(vfs_mknod);
  • 对于 Ext4 文件系统来说,dir->i_op->mknod 实际上是调用 ext4_mknod
  • 接下来就是一些繁琐的工作了

Shell Pipe

最简单的 Shell Pipe:

1
ls | grep log.txt
  • 其实就是把 ls 的输出作为 grep 的输入

Shell 中通过 fork + exec 创建子进程来执行命令,如果是含管道的 Shell 命令,则管道前后的命令分别由不同的进程执行,然后 通过管道把两个进程的标准输入输出连接起来 ,就实现了管道

  • PS:感觉和 “兄弟进程通信案例” 中的实现思路差不多