0%

并发

本文包含高并发相关的知识。以及一些高并发解决方案。

前置

并发

并发服务器可以同时处理多个请求,但它们是在同一个处理器或者计算机核心上交替执行的。在并发服务器中,服务器可以同时响应多个请求,但是每个请求必须等待前一个请求完成之后才能继续执行,因为它们共享同一个处理器资源。这种方式可以通过使用多线程、多进程、事件驱动等技术来实现,可以充分利用计算机的多核心资源。

虽然 CPU 在同一时刻只能执行一个任务,但是通过将 CPU 的使用权在恰当的时机分配给不同的任务,使得多个任务在视觉上看起来是一起执行的。CPU 的执行速度极快,多任务切换的时间也极短,用户根本感受不到,所以并发执行看起来才跟真的一样。

并行

并行服务器则是可以同时处理多个请求,并且这些请求是在多个处理器或计算机核心上并行执行的。在并行服务器中,不同的请求可以被分配到不同的处理器或计算机核心上并行执行,因此可以更快地完成任务。这种方式可以通过使用分布式计算、集群等技术来实现,可以充分利用多台或多核计算机的计算资源。

双核 CPU 执行两个任务时,每个核心各自执行一个任务,和单核 CPU 在两个任务之间不断切换相比,它的执行效率更高。

总结

因此,单核CPU上运行多进程或多线程,只能实现并发执行;如果在多核CPU上运行多进程或多线程,可以实现并行执行。虽然并发服务器和并行服务器都可以处理多个请求,但它们的处理方式和资源利用方式是不同的。

并发服务器主要是在单个处理器或计算机核心上交替处理多个请求,而并行服务器则是在多个处理器或计算机核心上同时处理多个请求。

IO 操作

阻塞 IO

阻塞式IO是一种同步IO操作方式,即在进行IO操作时,程序会一直阻塞等待,直到IO操作完成后才会继续执行下一步操作。在网络编程中,通常会使用阻塞式IO进行数据的读取和发送等操作。

优点: 操作简单易用

缺点: 只能处理一个IO操作,无法同时处理多个连接,可能会导致程序的效率低下。

非阻塞 IO

非阻塞式 IO 是一种异步 IO 操作方式,即在进行 IO 操作时,程序会立即返回,并继续执行其他操作,而不是一直等待IO操作完成。

程序可以通过不断查询 IO 操作的状态,来检测 IO 操作是否完成。在网络编程中,通常会使用非阻塞式 IO 配合轮询或信号等方式,来实现多个连接的同时处理。

优点: 可以处理多个连接,提高程序的并发性和响应速度

缺点: 增加了程序的复杂度,需要进行状态查询,可能会浪费CPU资源。

非阻塞 IO 的实现

在 Linux 中可以使用 fcntl 系统调用来设置非阻塞 IO

fcntl 是 Unix / Linux 系统下的一个系统调用函数,全称:file control
用于对已经打开的文件描述符进行一些控制操作,如复制文件描述符,修改文件状态标志等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
// 功能:对已经打开的文件描述符进行一些控制操作
// 参数:
// @fd : 需要操作的文件描述符
// @cmd : 控制命令,
// F_DUPFD:复制文件描述符,可以用来获取一个新的文件描述符,
// 该文件描述符与原来的描述符指向同一个文件。
// F_GETFD:获取文件描述符的标记值(close-on-exec)。
// F_SETFD:设置文件描述符的标记值。
// F_GETFL:获取文件状态标志。
// F_SETFL:设置文件状态标志。
// F_GETLK:获取文件锁信息。
// F_SETLK:设置文件锁。
// F_SETLKW:设置文件锁,如果无法获取锁,则阻塞等待锁的释放。
// F_GETOWN:获取文件所有者的进程。
// F_SETOWN:设置文件所有者的进程。
// @arg : 可选的参数,具体的含义和使用方式取决于cmd参数。如果不需要额外参数可写0或不写。
// O_APPEND:写入时追加到文件末尾。
// O_NONBLOCK:以非阻塞方式打开文件。
// O_SYNC:强制写入到磁盘。
// O_ASYNC:启用异步通知。
// 返回值: 成功返回0,失败返回-1

// 文件锁分为共享锁和独占锁两种类型,
// 可以通过fcntl函数设置和获取文件锁信息,实现文件读写的同步控制。

异步 IO

异步 IO 与非阻塞式 IO 相比,异步 IO 通过回调函数的方式来处理IO操作的完成事件,避免了阻塞和轮询等开销,程序的效率更高。

优点: 能够处理大量的并发IO请求,适合高并发场景

缺点: 是增加了程序的复杂度,代码可读性差,难以调试和维护。

异步模式 是一种编程模式,其特点是在调用一个耗时的操作时不会阻塞程序的执行,而是通过回调函数的方式在操作完成后再通知程序进行处理。在异步模式下,程序可以同时处理多个操作,提高了程序的并发性和响应速度。

内核事件轮询机制

内核事件轮询检测(Kernel polling)是一种用于检测设备或文件描述符是否有数据可读或可写的技术。

比如一般情况下,在计算中,当一个进程需要从非阻塞IO描述符中读取或写入数据时,通常会使用一个循环,不停的检测IO接口描述符中是否有数据产生,如果有就读取,如果没有就继续循环检测,但这样会大量的无效轮询,浪费大量的CPU资源。

内核事件轮询检测技术 可以解决这个问题,
它通过在内核中注册一个回调函数,当设备或文件描述符的状态发生变化时,内核会调用该回调函数,通知进程可以进行读取或写入操作。这种方式避免了进程在等待过程中的无效轮询,从而提高了CPU的利用率。

异步模式通常使用事件循环来实现,程序通过注册事件和回调函数的方式告诉事件循环要监听哪些事件,当事件发生时,事件循环会自动调用相应的回调函数进行处理。

事件循环可以使用操作系统提供的API(epollkqueue 等)或者第三方库(libeventlibuv 等)来实现。

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <stdio.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <signal.h>
// 修改文件描述符的行为或属性的方式:
// fcntl函数:
int set_nonblock(int fd)
{
//获取文件描述符操作状态
int flags = fcntl(fd, F_GETFL, 0);
//增加文件描述符操作状态为非阻塞.
flags |= O_NONBLOCK;
//用新的状态设置文件描述符。
if (fcntl(fd, F_SETFL, flags) == -1)
{
perror("fcntl err:");
return -1;
}
return 0;
}

// 当异步信号发出时,自动执行的函数:槽函数:
static int mouse_fd = -1;
void async_slots_functions()
{
char buf[128] = {0};
int nbytes = read(mouse_fd, buf, sizeof(buf) - 1);
if (nbytes == -1)
{
perror("read err:");
usleep(10000);
}
static int i = 0;
printf("%d读取了%d \n",i++, nbytes);
}

// 设置异步IO通知:
int set_async(int fd)
{
//1.获取文件描述符的操作行为的状态。
int flags = fcntl(fd, F_GETFL, 0);
//2.添加异步通知操作状态。
flags |= O_ASYNC;
//3.设置状态到描述符之中
if (fcntl(fd, F_SETFL, flags) == -1)
{
perror("fcntl err:");
return -1;
}
//4.设置文件描述符异步通知的进程为:当前进程。
if (fcntl(fd, F_SETOWN, getpid()) == -1)
{
perror("fcntl err:");
return -1;
}
//5.设置异步通知回调的执行函数:异步信号处理槽函数:
signal(SIGIO, async_slots_functions);
return 0;
}

int main(int argc, char const *argv[])
{
mouse_fd = open("/dev/input/mouse0", O_RDONLY);
if (mouse_fd == -1)
{
perror("open err:");
return -1;
}
// 用户数据缓冲区:
char buf[128] = {0};
// set_nonblock(mouse_fd);
set_async(mouse_fd);
while (true)
{
static int i = 0;
printf("%d大家好,才是真的好!\n", i++);
sleep(1);
}
return 0;
}

IO 多路复用

IO多路复用本身是使用单个线程来同时处理多个连接的IO请求。在网络编程中,常见的IO多路复用方式包括selectpollepoll等。

优点: 可以避免创建大量的线程或进程来处理IO请求,提高了程序的效率,同时可以处理大量的并发IO请求。

缺点: 增加了程序的复杂度,需要进行状态查询,同时IO多路复用的实现方式不同,可能会存在性能差异。

总结

不同的IO操作方式各有优缺点,应根据具体场景和需求选择合适的方式。

高并发 的网络编程中,可以使用 IO 多路复用技术来实现非阻塞式IO操作,同时避免了线程和进程的创建,提高了程序的效率。
低并发 的场景中,可以使用阻塞式IO操作,简单易用。

高性能高并发 的网络编程中,可以结合使用多种IO操作方式,如使用异步IO技术来处理大量的并发IO请求,使用IO多路复用技术来实现非阻塞式IO操作,提高程序的效率和响应速度。

非阻塞式IOIO多路复用 技术通常被使用在 高并发 的网络编程中。
非阻塞式IO 通过轮询或信号等方式来实现多个连接的同时处理,可以减少阻塞等待IO操作完成的时间,提高程序的效率。
IO多路复用 技术则是通过一个线程来同时处理多个连接的IO请求,减少了线程和进程的创建,降低了系统开销,提高了程序的效率和响应速度。

多进程并发

多进程技术在其他文章已经说明,这里不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);
// 功能:注册信号处理函数
// 参数:
// @signum : 表示所要处理的信号的编号。
// @act : 是一个指向struct sigaction类型的结构体指针,该结构体定义了对该信号的处理方式,
// 包括指定处理函数、处理标志等。如果该参数为NULL,则忽略信号。
// sigaction {
// void (*sa_handler)(int); // 信号处理函数指针
// void (*sa_sigaction)(int, siginfo_t *, void *); // 替代的信号处理函数指针
// sigset_t sa_mask; // 额外屏蔽的信号集
// int sa_flags; // 用于指定信号处理的标志
// void (*sa_restorer)(void); // 过时的恢复函数指针
// };
// @oldact : 参数是一个指向struct sigaction类型的结构体指针,用于保存原来的信号处理方式。
// 如果该参数为NULL,则忽略原来的信号处理方式。
// 返回值:
// 成功时返回0,失败时返回-1。

concurrence 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <string.h>
#include <unistd.h>
#include <stdbool.h>
#include <wait.h>
#include <stdlib.h>

//回收资源的函数:
void recyle_res()
{
int pid = waitpid(-1,NULL,0);
printf("回收了子进程%d的资源\n",pid);
}

int main(int argc, char const *argv[])
{
// 1.创建流式监听套接字类型:
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd == -1)
{
perror("socket() err:");
return -1;
}
// 2.定义一个网络地址信息结构体:
struct sockaddr_in serverInfo;
memset(&serverInfo, 0, sizeof(serverInfo));
serverInfo.sin_family = AF_INET;
// 端口号都是2字节,short类型,所以一定要注意字节序的问题:
serverInfo.sin_port = htons(9999);
serverInfo.sin_addr.s_addr = inet_addr("192.168.250.100");
// 3.绑定网络地址信息结构体:
int ret = bind(listen_fd, (const struct sockaddr *)&serverInfo, sizeof(serverInfo));
if (ret == -1)
{
perror("bind err:");
return -1;
}
// 4.设置监听的状态:
ret = listen(listen_fd, 1);
if (ret == -1)
{
perror("listen err:");
return -1;
}
printf("多进程服务器启动\n");
while (true)
{
int connect_fd = accept(listen_fd,NULL,NULL);
if(connect_fd == -1)
{
perror("connect err:");
return -1;
}
//创建子进程来处理链接套接字中的事务:
int pid = fork();
if(pid == 0)//子进程
{
char buf[128] = {0};
while (true)
{
memset(buf,0,sizeof(buf));
int nbytes = read(connect_fd,buf,sizeof(buf)-1);
if(nbytes == -1)
{
perror("read err:");
continue;
}
if(nbytes == 0)
{
printf("对方断开链接\n");
close(connect_fd);
exit(0);//SIGCHILD
}
printf("客户端发来的数据:%s \n",buf);
//回显示:
nbytes = write(connect_fd,buf,strlen(buf));
if(nbytes == -1)
{
perror("write err:");
continue;
}

}
}
else if(pid > 0)//父进程:
{
//认识信号注册函数sigaction:
struct sigaction sa = {0};
sa.sa_handler = recyle_res;
sa.sa_flags = SA_RESTART;
//异步信号处理子进程的资源回收的问题:
sigaction(SIGCHLD,&sa,NULL);
continue;
}
else{
perror("fork err:");
return -1;
}
}
return 0;
}

多进程并发服务器最大可并发的连接是有限的

1
ulimit -a # 查看系统的资源限制

多线程并发

多进程技术在其他文章已经说明,这里不再赘述。

在 Linux 中一个进程可以创建子线程数量可以通过以下路径配置文件查看:

1
$ cat /proc/sys/kernel/threads-max #也可以通过修改这个配置文件中的数量增加线程量。

理论上一个进程,可以创建很多线程,但一个进程可以创建的最大线程数受到许多因素的影响,例如系统硬件资源、进程的配置、线程的堆栈大小、进程的虚拟内存限制等等。

在 Linux 中,每个线程都有一个独立的内核栈,用于存储线程的执行上下文、局部变量、函数调用栈等信息。每个线程的内核栈的大小是固定的,并且由系统内核在创建线程时分配和初始化。

每个线程都有两种类型的栈:用户栈和内核栈。
用户栈 用于存储线程的执行上下文、局部变量、函数调用栈等信息,是线程执行代码时的主要工作区域。
内核栈 用于存储线程在内核中执行时所需要的状态信息,例如系统调用、中断处理等。
内核栈和用户栈是两个独立的栈,分别用于不同的目的。

线程的内核栈和用户栈在概念上是不同的,但在实际实现中,它们可能使用同一块物理内存。例如,在 x86 架构上,线程的内核栈和用户栈都是在同一块物理内存区域中,但是它们使用不同的段寄存器来访问这个内存区域。

注意: 线程的内核栈和用户栈的大小是可以独立设置的。在大多数情况下,
用户栈 的大小由编译器和链接器决定
内核栈 的大小可以通过调用 pthread_attr_setstacksize 函数来设置。
线程的用户栈和内核栈在使用上有一些不同之处,但它们共同构成了线程的执行环境。在大多数 Linux 系统中,线程的内核栈的大小通常在 2MB 左右。

线程的内核栈的大小是与线程的数量有关的。如果系统中创建了大量的线程,并且每个线程的内核栈大小都很大,那么可能会导致系统资源不足,从而影响系统的性能和稳定性。因此,在创建线程时,需要根据实际应用的需求和系统的硬件资源情况来设置线程的内核栈大小,以达到最佳的性能和可靠性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <stdio.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <string.h>
#include <unistd.h>
#include <stdbool.h>
#include <stdlib.h>
#include <pthread.h>

//子线程执行的函数:
void * subThreadTask(void* arg)
{
int connect_fd = *(int*)arg;
char buf[128] = {0};
while (true)
{
memset(buf,0,sizeof(buf));
int nbytes = read(connect_fd,buf,sizeof(buf)-1);
if(nbytes == -1)
{
continue;
}
if(nbytes == 0)
{
printf("对方已经关闭了\n");
break;
}
//打印一下:
printf("客户端发来的数据: %s \n",buf);
//回显服务器:
nbytes = write(connect_fd,buf,strlen(buf));
if(nbytes == -1)
{
perror("write err:");
break;
}
}
}
int main(int argc, char const *argv[])
{
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
perror("socket err:");
return -1;
}
//创建网络信息结构体:
struct sockaddr_in serverInfo = {0};
serverInfo.sin_family = AF_INET;
serverInfo.sin_port = htons(8080);
serverInfo.sin_addr.s_addr = INADDR_ANY;
//bind:
int ret = bind(listen_fd,(struct sockaddr*)&serverInfo,sizeof(serverInfo));
if(ret == -1)
{
perror("bind err:");
return -1;
}

//设置套接字为监听套接字:
ret = listen(listen_fd,5);
if(ret == -1)
{
perror("liseten err:");
return -1;
}
printf("多线程并发服务器启动\n");
while (true)
{
int connect_fd = accept(listen_fd,NULL,NULL);
if(connect_fd == -1)
{
continue;
}
//1.创建子线程:
pthread_t subThreadId;
pthread_create(&subThreadId,NULL,subThreadTask,(void*)&connect_fd);
//2.设定为分离态:
pthread_detach(subThreadId);
}
return 0;
}

IO 多路复用

IO多路复用 IO multiplexing 是一种高效的网络编程技术,可以在单个线程中处理多个I/O事件,从而提高程序的并发性能和效率,它允许一个进程同时监听多个文件描述符 sockets,并等待其中任何一个文件描述符可读或可写事件产生时立即响应。

在Linux中,常用的IO多路复用机制包括 selectpollepoll
相比于阻塞IO和非阻塞IO,IO多路复用的优势在于可以避免轮询、提高IO响应速度,同时也可以减少CPU占用率。

IO 多路复用的方式

selectpollepoll 都是 I/O 多路复用的机制,它们的目的是:在不创建新线程的情况下支持多客户端连接,并实现高并发的网络编程。因此,它们建立的服务器一般都是单线程的。

这些机制的工作原理是通过单个线程同时监听多个客户端连接的事件,当有事件发生时,再对相应的事件进行处理。由于单个线程只需要负责监听和处理 I/O 事件,所以无需创建多个线程,就能支持多客户端连接。

使用流程

  1. 首先要创建一个文件描述符的集合。(数据结构)
  2. 把关心的文件描述符放入到这个集合之中。
  3. 使用 selectpollepoll 等这个 linux 中的系统调用,把关心的文件描述符的集合放在入到内核态中,由内核事件监测事件机制来监测集合中描述的事件的产生。
  4. 当事件产生时,相应的 selectpoll、或 epoll_wait 函数则马上解除阻塞,并返回有事件的文件描述符的个数。
  5. 在进程中马上处理这些有事件的文件描述符。

concurrence 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
// 1. fd_set为文件描述符集合类型,是一个结构体:
#define FD_SETSIZE = 1024
typedef struct fd_set {
unsigned long fds_bits[FD_SETSIZE / (8 * sizeof(long))];//位图
} fd_set;
//所在fd_set结构体类型是一个位域数组。共1024个bit位的数组,所以它可以保存1024描述符的是否就绪状态。

// 2. select接口API:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
// 功能:监控一组文件描述符的状态,当其中有描述符就绪时,函数返回,并且可以知道哪些描述符已经就绪。
// 参数:
// @nfds : 需要监控的文件描述符的个数,即文件描述符集合中最大的文件描述符加1。
// @readfds : 读文件描述符集合。
// @writefds : 写文件描述符集合。
// @exceptfds : 异常文件描述符集合。
// @timeout : 超时时间,如果在指定的时间内没有任何文件描述符就绪,则返回0,否则返回就绪的文件描述符的个数。
// 返回值:
// 成功时返回就绪的文件描述符的个数,失败时返回-1。

// 配合select完成任务的几个宏函数:
// 1.用于将指定的文件描述符fd从文件描述符集合set中删除。
void FD_CLR(int fd, fd_set *set);

// 2.用于判断指定的文件描述符fd是否在文件描述符集合set中,如果存在,则返回非零值,否则返回0。
int FD_ISSET(int fd, fd_set *set);

// 3.用于将指定的文件描述符fd加入到文件描述符集合set中
void FD_SET(int fd, fd_set *set);

// 4.用于将文件描述符集合清空,即将所有位都设为0
void FD_ZERO(fd_set *set);

select 并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/ip.h>
#include <netinet/in.h>

int main(int argc, char const *argv[])
{
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
perror("socket err:");
return -1;
}
//构建信息结构体:
struct sockaddr_in serverInfo = {0};
serverInfo.sin_family = AF_INET;
serverInfo.sin_port = htons(8080);
serverInfo.sin_addr.s_addr = INADDR_ANY;
//bind:
int ret = bind(listen_fd,(struct sockaddr*)&serverInfo,sizeof(serverInfo));
if(ret == -1)
{
perror("bind err:");
return -1;
}

//把套接字设定为监听状态:
ret = listen(listen_fd,10);
if(ret == -1)
{
perror("listen err:");
return -1;
}

//设定文件描述符的集合:
fd_set readfd_save_set,readfd_modify_set;
FD_ZERO(&readfd_save_set);
//把要关心文件描述符放入到集合之中:
FD_SET(listen_fd,&readfd_save_set);

//设置个最大描述符:
int maxfd = listen_fd;
//定义一个用户数据收发的缓冲buf;
char buf[128] = {0};

//开启一个循环:
printf("开启一个小型的select并发服务器\n");
while(true)
{
readfd_modify_set = readfd_save_set;
int fds = select(maxfd + 1,&readfd_modify_set,NULL,NULL,NULL);
if(fds == -1)
{
perror("select err:");
continue;
}
//一定有事件产生:
for(int event_fd = 3;event_fd < maxfd + 1;event_fd++)
{
if(FD_ISSET(event_fd,&readfd_modify_set))
{
if(event_fd == listen_fd)//有新的客户端的链接。
{
int connect_fd = accept(event_fd,NULL,NULL);
if(connect_fd == -1)
{
perror("accept err:");
continue;
}
FD_SET(connect_fd,&readfd_save_set);
maxfd = maxfd > connect_fd ? maxfd : connect_fd;
printf("客户端与服务建立了连接\n");

}
else{//有数据可读,意味着可以收发数据了。
memset(buf,0,sizeof(buf));
int nbytes = read(event_fd,buf,sizeof(buf)-1);
if(nbytes == -1)
{
perror("read err:");
continue;
}
if(nbytes == 0)
{
printf("客户端已经断开了连接\n");
FD_CLR(event_fd,&readfd_save_set);
close(event_fd);
continue;
}
//打印一下对方发来的数据:
printf("客户端发来的数据:%s \n",buf);
//回显:
nbytes = write(event_fd,buf,strlen(buf));
if(nbytes == -1)
{
perror("write err:");
continue;
}
}
}
}
}
return 0;
}

select 的缺点

select

  1. 描述符多时会低效:每次调用 select 函数时,都会将用户态中的描述符拷贝到内核态,由内核来检测事件的产生,在有事件时,他遍历整个集合,并将相应的位置 1,同时把整个集合再次从内核态再拷贝到用户态,这样会带来很大的开销。当文件描述符集合很大时,这种开销就更加明显。
  2. 只支持较小的文件描述符集合:在一些操作系统中,select 函数对文件描述符集合的大小有限制,一般为1024。这意味着当需要同时监控的文件描述符数量很大时,就不能使用 select 函数了。
  3. 事件处理效率低:当多个文件描述符有事件发生时,select函数只能依次处理每个文件描述符,这样就会导致事件处理效率低下。

总之,select 函数在高并发场景下的表现并不理想,一些新的 IO 多路复用技术,如 epollkqueue 等,已经被广泛采用,以提高性能和可扩展性。

poll 并发

poll构建并发服务器的思路与select是一样的,唯一的区别是:要监测的文件描述符的集合使用的数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
// 功能:监控一组文件描述符的状态,当其中有描述符就绪时,函数返回,并且可以知道哪些描述符已经就绪。
// 参数:
// @fds : 文件描述符集合,是一个指向 pollfd 结构体数组的指针。
// struct pollfd {
// int fd; // 文件描述符,初始设置为-1,将被忽略;
// short events; // 要监视的事件 event mask short
// short revents; // 实际发生的事件 real event mask short
// };
// //额外知识点:掩码主要是用来分类或分段的。可以理解为区分的。
// @nfds : fds 数组中有效描述符元素的数量
// @timeout : timeout 是等待时间,单位为毫秒。-1为阻塞
// 返回值:
// 成功时返回就绪的文件描述符的个数,失败时返回-1。

需要注意的是,poll 函数在处理大量文件描述符时的性能可能较低,因为每次调用 poll 函数都需要遍历整个 pollfd 数组。为了提高性能,可以使用 epoll 函数来代替 poll 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/ip.h>
#include <netinet/in.h>
#include <poll.h>
int main(int argc, char const *argv[])
{
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
perror("socket err:");
return -1;
}
//创建网络信息结构体:
struct sockaddr_in serverInfo = {0};
serverInfo.sin_family = AF_INET;
serverInfo.sin_port = htons(8080);
serverInfo.sin_addr.s_addr = INADDR_ANY;

//bind信息结构体:
int ret = bind(listen_fd,(struct sockaddr*)&serverInfo,sizeof(serverInfo));
if(ret == -1)
{
perror("bind err:");
return -1;
}
//把套接字设定为监听态:
ret = listen(listen_fd,5);
if(ret == -1)
{
perror("liste err:");
return -1;
}
//开启构建poll中型并发服务器:
//1.创建描述的集合:pollfd事件结构体数组:
struct pollfd fdArray[2048] = {0};
for(int i = 0; i < 2048; i++)
{
fdArray[i].fd = -1;
}
//2.把关心的文件描述符放入到集合,就这是事件结构体数组;
int nfds = 0;
fdArray[nfds].fd = listen_fd;//关心的文件描述符。
fdArray[nfds].events = POLLIN;//关心的事件。
nfds++;
//用户收入数据缓冲区buf:
char buf[128] = {0};
//3.进入并发循环:
printf("poll中型并发服务器启动\n");
while(true)
{
int n_fd = poll(fdArray,nfds,-1);
if(n_fd == -1)
{
perror("poll err:");
return -1;
}
//遍历集合:
for(int i = 0; i < nfds;i++)
{
//判断集合中的元素是否有你关系的事件:通过revents属性&运算来判断。
if(fdArray[i].revents & POLLIN)
{
if(fdArray[i].fd == listen_fd)//有链接请求:
{
int connect_fd = accept(listen_fd,NULL,NULL);
if(connect_fd == -1)
{
continue;
}
fdArray[nfds].fd = connect_fd;
fdArray[nfds].events = POLLIN;
nfds++;
}
else{//有收发数据的请求:
memset(buf,0,sizeof(buf));
int nbytes = read(fdArray[i].fd,buf,sizeof(buf)-1);
if(nbytes == -1)
{
perror("read err:");
return -1;
}
if(nbytes == 0)
{
printf("对方服务器已经关闭\n");
//关闭套接字的一种方式:
nfds--;
close(fdArray[i].fd);
fdArray[i] = fdArray[nfds];
continue;
}
//打印一下客户端发来的数据:
printf("客户端发来的数据为:%s \n",buf);
//回显:
nbytes = write(fdArray[i].fd,buf,strlen(buf));
if(nbytes == -1)
{
perror("write err:");
continue;
}
}
}
}
}
return 0;
}

epoll 并发

epoll 是Linux系统中用于高效处理大量并发连接的一种机制,相比于传统的select和poll函数,具有更高的性能和更好的扩展性,是实现高性能网络服务器的重要工具之一。

epoll 机制中,内核使用两个数据结构来维护被监视的文件描述符和对应的事件:一个是红黑树,用于快速查找某个文件描述符的状态;另一个是双向链表,用于保存发生了事件的文件描述符。

具体来说,每个被监视的文件描述符都会在内核中对应一个 epitem 结构体,其中包含了文件描述符的状态信息,如感兴趣的事件类型、事件状态、回调函数等。所有的 epitem 结构体通过一个红黑树连接起来,红黑树的每个节点代表一个文件描述符。

当某个文件描述符上发生了感兴趣的事件时,内核会将该文件描述符的 epitem 结构体从红黑树中取出,并将其插入到一个双向链表中,这个双向链表的头部是 epoll 实例的就绪列表。程序调用 epoll_wait 函数时,会从这个就绪列表中取出已经准备就绪的文件描述符,并将相应的事件返回给程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <sys/epoll.h>
int epoll_create(int size);
// 功能:创建一个 epoll 实例,返回一个文件描述符,用于后续的 epoll 函数调用。
// 参数:
// @size : epoll 实例的大小,该参数在创建 epoll 实例时会被忽略,只是用于内核的优化。
// 返回值:
// 成功时返回一个文件描述符,失败时返回-1, 并设置 errno。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 功能:向 epoll 实例中添加、删除或修改文件描述符。
// 参数:
// @epfd : epoll 实例的文件描述符。
// @op : 操作类型
// EPOLL_CTL_ADD : 向 epoll 实例中添加文件描述符。
// EPOLL_CTL_MOD : 修改 epoll 实例中的文件描述符。
// EPOLL_CTL_DEL : 从 epoll 实例中删除文件描述符。
// @fd : 需要添加、删除或修改的文件描述符。
// @event : 事件类型,是一个 epoll_event 结构体。
// struct epoll_event {
// uint32_t events;
// epoll_data_t data;
// };
// typedef union epoll_data {
// void *ptr;
// int fd;
// uint32_t u32;
// uint64_t u64;
// } epoll_data_t;
// 返回值:
// 成功时返回0,失败时返回-1, 并设置 errno。

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// 功能:等待文件描述符上的事件发生。
// 参数:
// @epfd : epoll 实例的文件描述符。
// @events : 用于保存事件的数组。
// @maxevents : events 数组的大小。
// @timeout : 等待时间,单位为毫秒
// -1 : 阻塞等待,直到有事件发生。
// 0 : 不阻塞,立即返回。
// >0 : 等待指定的时间。
// 返回值:
// 成功时返回就绪的文件描述符的个数,失败时返回-1, 并设置 errno。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/ip.h>
#include <netinet/in.h>
#include <sys/epoll.h>
int main(int argc, char const *argv[])
{
int listen_fd = socket(AF_INET,SOCK_STREAM,0);
if(listen_fd == -1)
{
perror("socket err:");
return -1;
}
//创建网络信息结构体:
struct sockaddr_in serverInfo = {0};
serverInfo.sin_family = AF_INET;
serverInfo.sin_port = htons(8080);
serverInfo.sin_addr.s_addr = INADDR_ANY;

//bind信息结构体:
int ret = bind(listen_fd,(struct sockaddr*)&serverInfo,sizeof(serverInfo));
if(ret == -1)
{
perror("bind err:");
return -1;
}
//把套接字设定为监听态:
ret = listen(listen_fd,5);
if(ret == -1)
{
perror("liste err:");
return -1;
}
//创建一个文件描述的集合:
int epfd = epoll_create1(0);
if(epfd == -1)
{
perror("epoll_create1 err:");
return -1;
}
//把关心的文件描述符放入到epoll实例的红黑树中。
//定义一个事件结构体,把感兴趣的文件描述符与感兴趣事件,加入到epoll实例之中:
struct epoll_event ev = {0};
ev.data.fd = listen_fd;
ev.events = EPOLLIN;
ret = epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
if(ret == -1)
{
perror("epoll_ctl err:");
return -1;
}
//定义一个接收内核发来的有事件的(已经就绪的)文件描述符的集合(数据结构)
struct epoll_event event_fdArray[256] = {0};
//获取最大接收的有事件的个数:
int max_events = sizeof(event_fdArray) / sizeof(struct epoll_event);

//定义一个用户数据缓冲区buf:
char buf[128] = {0};
//进入并发循环:
printf("epoll大型并发服务器启动\n");
while(true)
{
int nfds = epoll_wait(epfd,event_fdArray,max_events,-1);
if(nfds == -1)
{
perror("epoll_wait err:");
return -1;
}
//遍历事件就绪的描述符的集合:
for(int i = 0; i < nfds; i++)
{
if(event_fdArray[i].data.fd == listen_fd)//有链接的请求:
{
int connect_fd = accept(listen_fd,NULL,NULL);
if(connect_fd == -1)
{
perror("accept err:");
continue;
}
ev.data.fd = connect_fd;
ev.events = EPOLLIN;
ret = epoll_ctl(epfd,EPOLL_CTL_ADD,connect_fd,&ev);
}
else{//有数据读写的请求:
memset(buf,0,sizeof(buf));
int nbytes = read(event_fdArray[i].data.fd,buf,sizeof(buf)-1);
if(nbytes == -1)
{
perror("read err:");
continue;
}
if(nbytes == 0)
{
printf("客户端关闭了连接\n");
ret = epoll_ctl(epfd,EPOLL_CTL_DEL,event_fdArray[i].data.fd,&event_fdArray[i]);
if(ret == -1)
{
perror("epoll_ctl err:");
continue;
}
//想着关闭无效已经关闭套接字资源。
close(event_fdArray[i].data.fd);
continue;
}
printf("客户端发来的数据:%s \n",buf);
//回显一下:
nbytes = write(event_fdArray[i].data.fd,buf,strlen(buf));
if(nbytes == -1)
{
perror("write err:");
continue;
}
}
}
}
}

总结

epollselectpoll 都是依赖于 事件检测机制 的 I/O 多路复用的机制,可以同时监听多个文件描述符的读写事件,并在事件发生时通知程序进行处理。它们的实现方式和底层机制不同,因此有以下区别:

应用程序对文件描述符的时间复杂度
在select和poll中,应用程序需要将要监听的文件描述符集合传递给内核,内核将这些文件描述符复制到内核空间,然后将它们加入到一个由内核维护的数据结构中,然后在等待事件时,应用程序阻塞等待内核通知有事件发生,内核查找事件集合的时间复杂度为O(n)
在epoll中,应用程序需要将要监听的文件描述符加入到一个由内核维护的红黑树中,然后在等待事件时,当有事件发生时,内核会在红黑树中查找对应的文件描述符并通知应用程序。内核查找事件树的时间复杂度为O(logN), 在新的kernal2.6之后为散列表,遍历时间为O(1);

文件描述符数量的处理方式
在select和poll中,每次调用select和poll时,应用程序需要将所有要监听的文件描述符都传递给内核,内核需要遍历所有文件描述符,这样当文件描述符很多时,效率会比较低。
在epoll中,应用程序只需要在第一次调用epoll时将所有要监听的文件描述符加入到红黑树中,然后在后续的调用中,只需要传递一个epoll实例,这个实例中包含了之前加入的所有文件描述符,不需要再次传递文件描述符,因此效率会更高。

内存的处理方式
在select和poll中,内核需要将应用程序传递的文件描述符复制到内核空间,这样会产生一定的内存开销。有事件时,再把这个集合传递到用户进程空间,浪费大量的拷贝与遍历算力。
在epoll中,内核只需要在第一次调用epoll时将文件描述符加入红黑树中,后续的调用只需要传递epoll实例,不需要重复传递文件描述符,只传递有事件已经就绪的文件描述符的集合,因此内存开销会比较小,可以大在提升服务器并发的能力。

总之,虽然 selectpollepoll 都是 I/O 多路复用的机制,但它们在实现方式和性能上都有所不同,应用程序可以根据自己的需求选择合适的机制。