Linux内核系列:进程间通信

匿名管道

适用场景:在父子线程间传递信息。
核心是pipe函数,创建了全双工的两个文件描述符。下图来自APUE十五章。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int main(){
int fd[2];
pid_t pid;
int err = pipe(fd);
char line[100];
if(err == -1){
cout << "pipe error" << endl;
}
if((pid = fork()) < 0){
cout << "fork error" << endl;
abort();
}
else if(pid > 0){
close(fd[0]);
char s[] = "hello";
write(fd[1], s, sizeof(s));
}
else{
close(fd[1]);
int n = read(fd[0], line, 100);
write(STDOUT_FILENO, line, n);
}
return 0;
}

命令管道

在Linux中使用mkfifo函数创建一个命令管道,可以使用此管道实现不同进程之间的通信。
命名管道和匿名的相同和不用是:

  • 相同:
    • 都用管道通信
    • 管道数据都存在内核内存的缓冲区
  • 不同:
    • 匿名不在磁盘建立管道文件,FIFO在磁盘建立文件
    • PIPE需要进程具有亲缘关系,FIFO不需要

读和写可以设置阻塞或者非阻塞:

  • 阻塞:
    • 写端等待数据被读走后才写
    • 读端等待数据写入后才读
  • 非阻塞:
    • 写端一直写,直到缓冲区满
    • 读端无论有无数据,立刻返回

下面是一个使用实例:

写端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#define FIFO_FIFE "./fiffo"
#define BUFF_SIZE PIPE_BUF
int main(){
if(mkfifo(FIFO_FIFE, 0666) < 0 && errno != EEXIST){
perror("create fifo file error");
return -1;
}
else{
char info[] = "0";
int fd = open(FIFO_FIFE, O_CREAT | O_WRONLY, 0666);
if(fd > 0){
int count = 60;
while(-- count){
write(fd, info, sizeof(info));
sleep(1);
cout << "writing" << info << " msg to fifo" << endl;
info[0] = (char)('0' + count);
}
char sig[] = "end";
write(fd, sig, sizeof(sig));
close(fd);
}
}
return 0;
}

读端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#define FIFO_FIFE "./fiffo"
#define BUFF_SIZE PIPE_BUF
int main(){
if(mkfifo(FIFO_FIFE, 0666) < 0 && errno != EEXIST){
perror("create fifo file error");
return -1;
}
else{
char buf[BUFF_SIZE];
int fd = open(FIFO_FIFE, O_CREAT | O_RDONLY, 0666);
if(fd > 0){
while(true){
read(fd, buf, BUFF_SIZE);
sleep(1);
cout << "reading " << buf << " from fifo" << endl;
if(strcmp(buf, "end") == 0){
break;
}
}
close(fd);
}
}
return 0;
}

信号

信号是异步的,一个进程不必通过任何操作来等待信号的到达。
使用信号有两个作用:

  • 让进程直到发生了一个特定的事件
  • 强迫进程去执行信号处理程序

ps: 信号机制是一种特殊的软中断设计。

信号机制底层实现:

  1. 信号产生:内核更新目标进程的数据结构表示一个新信号已发送;
  2. 信号传递:内核强迫目标进程:
    • 改变进程的执行状态
    • 执行一个信号处理函数
  3. 注意:当信号产生但还未被传递,则称为挂起信号。进程执行信号处理函数时屏蔽同编号的信号。

信号处理的相关源码和细节参考这篇文章.
用一张图来概括:

下面写一个实例吧:
在新版系统中使用sigaction处理信号:

1
2
3
4
5
6
7
8
9
10
int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);

// 用 sigaction 结构取代了单一的 sighandler_t 函数指针
struct sigaction {
void (*sa_handler)(int); // 信号处理函数
void (*sa_sigaction)(int, siginfo_t *, void *); // 另一种替代的信号处理函数
sigset_t sa_mask; // 指定了应该被阻塞的信号掩码
int sa_flags; // 指定一组修改信号行为的标志
void (*sa_restorer)(void); // 应用程序不是使用这个成员
};

接收端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include<sys/types.h>
#include<sys/stat.h>
#include<unistd.h>
#include<fcntl.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<signal.h>
#include <iostream>
using namespace std;
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while( 0)

void handler( int, siginfo_t *, void *);


int main( int argc, char *argv[]){
struct sigaction act;
act.sa_sigaction = handler; //sa_sigaction与sa_handler只能取其一
//sa_sigaction多用于实时信号,可以保存信息
sigemptyset(&act.sa_mask);
act.sa_flags = SA_SIGINFO; // 设置标志位后可以接收其他进程
// 发送的数据,保存在siginfo_t结构体中

if (sigaction(SIGUSR1, &act, NULL) < 0)
ERR_EXIT( "sigaction error");
cout << "mypid:" << getpid() << endl;
for (; ;)
pause();

return 0;

}

void handler( int sig, siginfo_t *info, void *ctx)
{
printf( "recv a sig=%d data=%d data=%d\n",
sig, info->si_value.sival_int, info->si_int);
}


发送端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include<sys/types.h>
#include<sys/stat.h>
#include<unistd.h>
#include<fcntl.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<signal.h>

#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while( 0)



int main( int argc, char *argv[])
{
if (argc != 2)
{
fprintf(stderr, "Usage %s pid\n", argv[ 0]);
exit(EXIT_FAILURE);
}

pid_t pid = atoi(argv[ 1]); //字符串转换为整数
union sigval val;
val.sival_int = 100;
sigqueue(pid, SIGUSR1, val); // 只可以发信号给某个进程,而不能是进程组

return 0;

}

上面这两段程序在ubuntu 16.04虚拟机、linux16.04 系统运行正常,在wsl ubuntu 18.04下运行时,接受不到信号,暂时还没有找到原因。

信号量

信号量的本质就是对资源的一组控制原语,进程间可以通过信号量实现同步。

下面用信号量实现经典的生产者消费者问题, 下面使用线程实现的,但是原理差不多,主要就是解决同步问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#include<sys/types.h>
#include<sys/stat.h>
#include<unistd.h>
#include<fcntl.h>
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<signal.h>
#include <iostream>
#include <semaphore.h>
#include <pthread.h>
using namespace std;

typedef int item;
#define N 20
#define pSpeed 1
#define cSpeed 2

sem_t mutex, empty, full;
int head, tail;
int buf[N];

void showbuf(){
if(head > tail){
for(int i = tail; i <= head; ++ i){
cout << buf[i] << " ";
}
}
else if(head < tail){
for(int i = tail; i <= N; ++ i){
cout << buf[i] << " ";
}
for(int i = 0; i < head; ++ i){
cout << buf[i] << " ";
}
}
else{
for(int i = 0; i < N; ++ i){
cout << buf[i] << " ";
}
}
cout << endl;
return;
}

void* producer(void* id){
while(true){
item r = rand() % 100 + 1;
sleep(pSpeed);
sem_wait(&empty);
sem_wait(&mutex);
buf[head] = r;
head = (++ head) % N;
showbuf();
sem_post(&mutex);
sem_post(&full);


}
}

void* customer(void* id){
while(true){
sleep(cSpeed);
sem_wait(&full);
sem_wait(&mutex);
buf[tail] = 0;
tail = (++ tail) % N;
showbuf();
sem_post(&mutex);
sem_post(&empty);

}
}

int main( int argc, char *argv[]){
head = 0, tail = head;
if(sem_init(&mutex, 0, 1) == -1)
perror("sem_init error 1");
if(sem_init(&empty, 0, N) == -1)
perror("sem_init error 1");
if(sem_init(&full, 0, 0) == -1)
perror("sem_init error 1");
pthread_t p1, p2, c1, c2;

if(pthread_create(&p1, NULL, producer, (void*)"p1"))
perror("pthread_create");
if(pthread_create(&p2, NULL, producer, (void*)"p1"))
perror("pthread_create");
if(pthread_create(&c1, NULL, customer, (void*)"c1"))
perror("pthread_create");
if(pthread_create(&c2, NULL, customer, (void*)"c2"))
perror("pthread_create");
if(pthread_join(p1, NULL)) perror("pthread_join");
if(pthread_join(p2, NULL)) perror("pthread_join");
if(pthread_join(c1, NULL)) perror("pthread_join");
if(pthread_join(c2, NULL)) perror("pthread_join");
return 0;

}

共享内存

共享内存就是允许两个或多个进程共享一定的存储区。就如同 malloc() 函数向不同进程返回了指向同一个物理内存区域的指针。当一个进程改变了这块地址中的内容的时候,其它进程都会察觉到这个更改。因为数据不需要在客户机和服务器端之间复制,数据直接写到内存,不用若干次数据拷贝,所以这是最快的一种IPC。
注意:共享内存没有任何的同步与互斥机制,所以要使用信号量来实现对共享内存的存取的同步。

下面以mmap为例说明如何进行进程间通信:
注意需要先创建一个文件,文件名作为输入参数。
读端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <sys/mman.h>  
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <error.h>
#include <iostream>
using namespace std;
#define BUF_SIZE 100

int main(int argc, char **argv)
{
int fd, nread, i;
struct stat sb;
char *mapped, buf[BUF_SIZE];

for (i = 0; i < BUF_SIZE; i++) {
buf[i] = '#';
}
/* 打开文件 */
if ((fd = open(argv[1], O_RDWR)) < 0) {
perror("open");
}

/* 获取文件的属性 */
if ((fstat(fd, &sb)) == -1) {
perror("fstat");
}

/* 将文件映射至进程的地址空间 */
if ((mapped = (char *)mmap(NULL, sb.st_size, PROT_READ |
PROT_WRITE, MAP_SHARED, fd, 0)) == (void *)-1) {
perror("mmap");
}

/* 文件已在内存, 关闭文件也可以操纵内存 */
close(fd);

/* 每隔两秒查看存储映射区是否被修改 */
while (1) {
cout << mapped << endl;
sleep(2);
}

return 0;
}

写端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//进程B的代码
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <error.h>
#include <iostream>
using namespace std;

#define BUF_SIZE 100

int main(int argc, char **argv){
int fd, nread, i;
struct stat sb;
char *mapped, buf[BUF_SIZE];

for (i = 0; i < BUF_SIZE; i++) {
buf[i] = '#';
}

/* 打开文件 */
if ((fd = open(argv[1], O_RDWR)) < 0) {
perror("open");
}

/* 获取文件的属性 */
if ((fstat(fd, &sb)) == -1) {
perror("fstat");
}

/* 私有文件映射将无法修改文件 */
if ((mapped = (char *)mmap(NULL, sb.st_size, PROT_READ |
PROT_WRITE, MAP_SHARED, fd, 0)) == (void *)-1) {
perror("mmap");
}

/* 映射完后, 关闭文件也可以操纵内存 */
close(fd);


while(true){
/* 修改一个字符 */
mapped[0] = '9';
sleep(1);
cout << mapped << endl;
}
return 0;
}

消息队列

Linux的消息队列(queue)实质上是一个链表, 它有消息队列标识符(queue ID). msgget创建一个新队列或打开一个存在的队列; msgsnd向队列末端添加一条新消息; msgrcv从队列中取消息, 取消息是不一定遵循先进先出的, 也可以按消息的类型字段取消息。
下面实例说明如何利用linux msg系列系统调用来使用消息队列进行进程间通信:
发送端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <sys/mman.h>  
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <error.h>
#include <iostream>
#include <cstring>
#include <string>
using namespace std;

const int MAX_LEN = 20;
struct item{
int id;
char data[MAX_LEN];
};
#define handle_error(msg) \
perror(msg); \
exit(-1);
int main(int argc, char **argv) {
int msqid;
item it;
long int msgtype = 0;
msqid = msgget((key_t)10086, 0666 | IPC_CREAT);
if(msqid < 0){
handle_error("msg queue create error");
}
int count = 20;
while(-- count){
if(count == 1){
memcpy(it.data, "end", sizeof("end"));
}
else{
string temp = "number: " + to_string(count);
const char * msg = temp.c_str();
memcpy(it.data, msg, sizeof(msg));
}
cout << "sending:" << it.data << endl;
if(msgsnd(msqid, &it, sizeof(item), 0) < 0){
handle_error("send error");
}
sleep(1);
}
return 0;
}

接收端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//进程B的代码
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <error.h>
#include <iostream>
#include <cstring>
using namespace std;
const int MAX_LEN = 20;
struct item{
int id;
char data[MAX_LEN];
};
#define handle_error(msg) \
perror(msg); \
exit(-1);
int main(int argc, char **argv){
int msqid;
item it;
long int msgtype = 0;
msqid = msgget((key_t)10086, 0666 | IPC_CREAT);
if(msqid < 0){
handle_error("msg queue create error");
}

while(true){
if(msgrcv(msqid, &it, sizeof(item), msgtype, 0) < 0){
handle_error("msg rev error");
}
cout << "msg : " << it.data << endl;
if(strcmp(it.data, "end") == 0) break;
}

return 0;
}

Socket

常用于网络编程,在进程间通信时可以用,但是感觉大材小用了。在后续网络编程中会详细分析。