匿名管道
适用场景:在父子线程间传递信息。
核心是pipe函数,创建了全双工的两个文件描述符。下图来自APUE十五章。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| int main(){ int fd[2]; pid_t pid; int err = pipe(fd); char line[100]; if(err == -1){ cout << "pipe error" << endl; } if((pid = fork()) < 0){ cout << "fork error" << endl; abort(); } else if(pid > 0){ close(fd[0]); char s[] = "hello"; write(fd[1], s, sizeof(s)); } else{ close(fd[1]); int n = read(fd[0], line, 100); write(STDOUT_FILENO, line, n); } return 0; }
|
命令管道
在Linux中使用mkfifo函数创建一个命令管道,可以使用此管道实现不同进程之间的通信。
命名管道和匿名的相同和不用是:
- 相同:
- 不同:
- 匿名不在磁盘建立管道文件,FIFO在磁盘建立文件
- PIPE需要进程具有亲缘关系,FIFO不需要
读和写可以设置阻塞或者非阻塞:
- 阻塞:
- 非阻塞:
- 写端一直写,直到缓冲区满
- 读端无论有无数据,立刻返回
下面是一个使用实例:
写端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #define FIFO_FIFE "./fiffo" #define BUFF_SIZE PIPE_BUF int main(){ if(mkfifo(FIFO_FIFE, 0666) < 0 && errno != EEXIST){ perror("create fifo file error"); return -1; } else{ char info[] = "0"; int fd = open(FIFO_FIFE, O_CREAT | O_WRONLY, 0666); if(fd > 0){ int count = 60; while(-- count){ write(fd, info, sizeof(info)); sleep(1); cout << "writing" << info << " msg to fifo" << endl; info[0] = (char)('0' + count); } char sig[] = "end"; write(fd, sig, sizeof(sig)); close(fd); } } return 0; }
|
读端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #define FIFO_FIFE "./fiffo" #define BUFF_SIZE PIPE_BUF int main(){ if(mkfifo(FIFO_FIFE, 0666) < 0 && errno != EEXIST){ perror("create fifo file error"); return -1; } else{ char buf[BUFF_SIZE]; int fd = open(FIFO_FIFE, O_CREAT | O_RDONLY, 0666); if(fd > 0){ while(true){ read(fd, buf, BUFF_SIZE); sleep(1); cout << "reading " << buf << " from fifo" << endl; if(strcmp(buf, "end") == 0){ break; } } close(fd); } } return 0; }
|
信号
信号是异步的,一个进程不必通过任何操作来等待信号的到达。
使用信号有两个作用:
- 让进程直到发生了一个特定的事件
- 强迫进程去执行信号处理程序
ps: 信号机制是一种特殊的软中断设计。
信号机制底层实现:
- 信号产生:内核更新目标进程的数据结构表示一个新信号已发送;
- 信号传递:内核强迫目标进程:
- 注意:当信号产生但还未被传递,则称为挂起信号。进程执行信号处理函数时屏蔽同编号的信号。
信号处理的相关源码和细节参考这篇文章.
用一张图来概括:
下面写一个实例吧:
在新版系统中使用sigaction处理信号:
1 2 3 4 5 6 7 8 9 10
| int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
struct sigaction { void (*sa_handler)(int); void (*sa_sigaction)(int, siginfo_t *, void *); sigset_t sa_mask; int sa_flags; void (*sa_restorer)(void); };
|
接收端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| #include<sys/types.h> #include<sys/stat.h> #include<unistd.h> #include<fcntl.h> #include<stdio.h> #include<stdlib.h> #include<errno.h> #include<string.h> #include<signal.h> #include <iostream> using namespace std; #define ERR_EXIT(m) \ do { \ perror(m); \ exit(EXIT_FAILURE); \ } while( 0)
void handler( int, siginfo_t *, void *);
int main( int argc, char *argv[]){ struct sigaction act; act.sa_sigaction = handler; sigemptyset(&act.sa_mask); act.sa_flags = SA_SIGINFO;
if (sigaction(SIGUSR1, &act, NULL) < 0) ERR_EXIT( "sigaction error"); cout << "mypid:" << getpid() << endl; for (; ;) pause();
return 0;
}
void handler( int sig, siginfo_t *info, void *ctx) { printf( "recv a sig=%d data=%d data=%d\n", sig, info->si_value.sival_int, info->si_int); }
|
发送端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #include<sys/types.h> #include<sys/stat.h> #include<unistd.h> #include<fcntl.h> #include<stdio.h> #include<stdlib.h> #include<errno.h> #include<string.h> #include<signal.h>
#define ERR_EXIT(m) \ do { \ perror(m); \ exit(EXIT_FAILURE); \ } while( 0)
int main( int argc, char *argv[]) { if (argc != 2) { fprintf(stderr, "Usage %s pid\n", argv[ 0]); exit(EXIT_FAILURE); }
pid_t pid = atoi(argv[ 1]); union sigval val; val.sival_int = 100; sigqueue(pid, SIGUSR1, val);
return 0;
}
|
上面这两段程序在ubuntu 16.04虚拟机、linux16.04 系统运行正常,在wsl ubuntu 18.04下运行时,接受不到信号,暂时还没有找到原因。
信号量
信号量的本质就是对资源的一组控制原语,进程间可以通过信号量实现同步。
下面用信号量实现经典的生产者消费者问题, 下面使用线程实现的,但是原理差不多,主要就是解决同步问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| #include<sys/types.h> #include<sys/stat.h> #include<unistd.h> #include<fcntl.h> #include<stdio.h> #include<stdlib.h> #include<errno.h> #include<string.h> #include<signal.h> #include <iostream> #include <semaphore.h> #include <pthread.h> using namespace std;
typedef int item; #define N 20 #define pSpeed 1 #define cSpeed 2
sem_t mutex, empty, full; int head, tail; int buf[N];
void showbuf(){ if(head > tail){ for(int i = tail; i <= head; ++ i){ cout << buf[i] << " "; } } else if(head < tail){ for(int i = tail; i <= N; ++ i){ cout << buf[i] << " "; } for(int i = 0; i < head; ++ i){ cout << buf[i] << " "; } } else{ for(int i = 0; i < N; ++ i){ cout << buf[i] << " "; } } cout << endl; return; }
void* producer(void* id){ while(true){ item r = rand() % 100 + 1; sleep(pSpeed); sem_wait(&empty); sem_wait(&mutex); buf[head] = r; head = (++ head) % N; showbuf(); sem_post(&mutex); sem_post(&full); } }
void* customer(void* id){ while(true){ sleep(cSpeed); sem_wait(&full); sem_wait(&mutex); buf[tail] = 0; tail = (++ tail) % N; showbuf(); sem_post(&mutex); sem_post(&empty); } }
int main( int argc, char *argv[]){ head = 0, tail = head; if(sem_init(&mutex, 0, 1) == -1) perror("sem_init error 1"); if(sem_init(&empty, 0, N) == -1) perror("sem_init error 1"); if(sem_init(&full, 0, 0) == -1) perror("sem_init error 1"); pthread_t p1, p2, c1, c2;
if(pthread_create(&p1, NULL, producer, (void*)"p1")) perror("pthread_create"); if(pthread_create(&p2, NULL, producer, (void*)"p1")) perror("pthread_create"); if(pthread_create(&c1, NULL, customer, (void*)"c1")) perror("pthread_create"); if(pthread_create(&c2, NULL, customer, (void*)"c2")) perror("pthread_create"); if(pthread_join(p1, NULL)) perror("pthread_join"); if(pthread_join(p2, NULL)) perror("pthread_join"); if(pthread_join(c1, NULL)) perror("pthread_join"); if(pthread_join(c2, NULL)) perror("pthread_join"); return 0;
}
|
共享内存
共享内存就是允许两个或多个进程共享一定的存储区。就如同 malloc() 函数向不同进程返回了指向同一个物理内存区域的指针。当一个进程改变了这块地址中的内容的时候,其它进程都会察觉到这个更改。因为数据不需要在客户机和服务器端之间复制,数据直接写到内存,不用若干次数据拷贝,所以这是最快的一种IPC。
注意:共享内存没有任何的同步与互斥机制,所以要使用信号量来实现对共享内存的存取的同步。
下面以mmap为例说明如何进行进程间通信:
注意需要先创建一个文件,文件名作为输入参数。
读端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <error.h> #include <iostream> using namespace std; #define BUF_SIZE 100 int main(int argc, char **argv) { int fd, nread, i; struct stat sb; char *mapped, buf[BUF_SIZE]; for (i = 0; i < BUF_SIZE; i++) { buf[i] = '#'; } if ((fd = open(argv[1], O_RDWR)) < 0) { perror("open"); } if ((fstat(fd, &sb)) == -1) { perror("fstat"); } if ((mapped = (char *)mmap(NULL, sb.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == (void *)-1) { perror("mmap"); } close(fd); while (1) { cout << mapped << endl; sleep(2); } return 0; }
|
写端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <error.h> #include <iostream> using namespace std;
#define BUF_SIZE 100 int main(int argc, char **argv){ int fd, nread, i; struct stat sb; char *mapped, buf[BUF_SIZE]; for (i = 0; i < BUF_SIZE; i++) { buf[i] = '#'; } if ((fd = open(argv[1], O_RDWR)) < 0) { perror("open"); } if ((fstat(fd, &sb)) == -1) { perror("fstat"); } if ((mapped = (char *)mmap(NULL, sb.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == (void *)-1) { perror("mmap"); } close(fd); while(true){ mapped[0] = '9'; sleep(1); cout << mapped << endl; } return 0; }
|
消息队列
Linux的消息队列(queue)实质上是一个链表, 它有消息队列标识符(queue ID). msgget创建一个新队列或打开一个存在的队列; msgsnd向队列末端添加一条新消息; msgrcv从队列中取消息, 取消息是不一定遵循先进先出的, 也可以按消息的类型字段取消息。
下面实例说明如何利用linux msg系列系统调用来使用消息队列进行进程间通信:
发送端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| #include <sys/mman.h> #include <sys/stat.h> #include <sys/ipc.h> #include <sys/msg.h> #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <error.h> #include <iostream> #include <cstring> #include <string> using namespace std; const int MAX_LEN = 20; struct item{ int id; char data[MAX_LEN]; }; #define handle_error(msg) \ perror(msg); \ exit(-1); int main(int argc, char **argv) { int msqid; item it; long int msgtype = 0; msqid = msgget((key_t)10086, 0666 | IPC_CREAT); if(msqid < 0){ handle_error("msg queue create error"); } int count = 20; while(-- count){ if(count == 1){ memcpy(it.data, "end", sizeof("end")); } else{ string temp = "number: " + to_string(count); const char * msg = temp.c_str(); memcpy(it.data, msg, sizeof(msg)); } cout << "sending:" << it.data << endl; if(msgsnd(msqid, &it, sizeof(item), 0) < 0){ handle_error("send error"); } sleep(1); } return 0; }
|
接收端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| #include <sys/mman.h> #include <sys/stat.h> #include <sys/ipc.h> #include <sys/msg.h> #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <error.h> #include <iostream> #include <cstring> using namespace std; const int MAX_LEN = 20; struct item{ int id; char data[MAX_LEN]; }; #define handle_error(msg) \ perror(msg); \ exit(-1); int main(int argc, char **argv){ int msqid; item it; long int msgtype = 0; msqid = msgget((key_t)10086, 0666 | IPC_CREAT); if(msqid < 0){ handle_error("msg queue create error"); }
while(true){ if(msgrcv(msqid, &it, sizeof(item), msgtype, 0) < 0){ handle_error("msg rev error"); } cout << "msg : " << it.data << endl; if(strcmp(it.data, "end") == 0) break; }
return 0; }
|
Socket
常用于网络编程,在进程间通信时可以用,但是感觉大材小用了。在后续网络编程中会详细分析。