2.使用I/O复用技术和线程池
网络中有很多用户会尝试去connect()这个WebServer上正在listen的这个port,而监听到的这些连接会排队等待被accept()。由于用户连接请求是随机到达的异步事件,每当监听socket(listenfd)listen到新的客户连接并且放入监听队列,我们都需要告诉Web服务器有连接来了,accept这个连接,并分配一个逻辑单元来处理这个用户请求。而且,我们在处理这个请求的同时,还需要继续监听其他客户的请求并分配另一逻辑单元来处理新的用户请求(即并发,同时处理多个事件,后面会使用线程池实现并发)。
这里,服务器通过epoll这种I/O复用技术来实现对监听socket(listenfd)和连接socket(客户请求)的同时监听。I/O复用虽然可以同时监听多个文件描述符,但是它本身是阻塞的,并且当有多个文件描述符同时就绪的时候,如果不采取额外措施,程序则只能按顺序处理其中就绪的每一个文件描述符,所以为提高效率,我们将在这部分通过线程池来实现并发(多线程并发),为每个就绪的文件描述符分配一个逻辑单元(线程)来处理。
代码块
//对文件描述符设置非阻塞
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
/* 将fd上的EPOLLIN和EPOLLET事件注册到epollfd指示的epoll内核事件中 */
void addfd(int epollfd, int fd, bool one_shot) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
/* 针对connfd,开启EPOLLONESHOT,因为我们希望每个socket在任意时刻都只被一个线程处理 */
if(one_shot)
event.events |= EPOLLONESHOT;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
/* 创建一个额外的文件描述符来唯一标识内核中的epoll事件表 */
int epollfd = epoll_create(5);
/* 用于存储epoll事件表中就绪事件的event数组 */
epoll_event events[MAX_EVENT_NUMBER];
/* 主线程往epoll内核事件表中注册监听socket事件,当listen到新的客户连接时,listenfd变为就绪事件 */
addfd(epollfd, listenfd, false);
/* 主线程调用epoll_wait等待一组文件描述符上的事件,并将当前所有就绪的epoll_event复制到events数组中 */
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
/* 然后我们遍历这一数组以处理这些已经就绪的事件 */
for(int i = 0; i < number; ++i) {
int sockfd = events[i].data.fd; // 事件表中就绪的socket文件描述符
if(sockfd == listenfd) { // 当listen到新的用户连接,listenfd上则产生就绪事件
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
/* ET模式 */
while(1) {
/* accept()返回一个新的socket文件描述符用于send()和recv() */
int connfd = accept(listenfd, (struct sockaddr *) &client_address, &client_addrlength);
/* 并将connfd注册到内核事件表中,users是 http_conn 类型的数组 */
users[connfd].init(connfd, client_address);
/*
...
*/
}
}
else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) {
// 如有异常,则直接关闭客户连接,并删除该用户的timer
/*
...
*/
}
else if(events[i].events & EPOLLIN) {
/* 当这一sockfd上有可读事件时,epoll_wait通知主线程。*/
if(users[sockfd].read()) { /* 主线程从这一sockfd循环读取数据, 直到没有更多数据可读 */
pool->append(users + sockfd); /* 然后将读取到的数据封装成一个请求对象并插入请求队列 */
/*
...
*/
}
else
/*
...
*/
}
else if(events[i].events & EPOLLOUT) {
/* 当这一sockfd上有可写事件时,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果 */
if(users[sockfd].write()) {
/*
...
*/
}
else
/*
...
*/
}
}
accept函数
服务器通过 accept() 函数来接收客户端请求。
函数原型如下:
int accept(int sock, struct sockaddr *addr, socklen_t *addrlen)
sock 为服务器端套接字,addr 为 sockaddr_in 结构体变量,addrlen 为参数 addr 的长度,可由 sizeof() 求得。
accept() 返回一个新的套接字来和客户端通信,addr 保存了客户端的IP地址和端口号。后面和客户端通信时,要使用这个新生成的套接字。
I/O复用
I/O复用使得程序能同时监听多个文件描述符。通常,网络程序在以下情况需要使用I/O复用技术:
- 客户端程序要同时处理多个socket。
- 客户端程序要同时处理用户输入和网络连接。
- TCP服务器要同时处理监听socket和连接socket。这是I/O复用使用最多的场合。
- 服务器要同时处理TCP请求和UDP请求。
- 服务器要同时监听多个端口,或处理多种服务。
注意:I/O复用本身是阻塞的。
Linux下实现I/O复用的系统调用主要有select、poll、epoll。
select
int select ( int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout )
1)nfds参数指定被监听的文件描述符的总数。通常设置为被监听的所有文件描述符中的最大值加1,因为文件描述符是从0开始计数的。
2)readfds、writefds、exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。
3)timeout参数用来设置select函数的超时时间。
返回值:成功时返回就绪文件描述符的总数,超时返回0,出错返回-1并设置errno。
poll
int poll ( struct pollfd * fds, nfds_t nfds, int timeout )
1)fds参数是一个pollfd结构类型的数组,它指定所有我们感兴趣的文件描述符上发生的可读、可写、异常等事件。
2)nfds参数指定被监听事件集合fds的大小。
3)timeout参数指定poll超时值,单位是毫秒。当timeout值为-1时,poll调用将永远阻塞,直到某个事件发生;当timeout值为0时,poll调用将立即返回。
poll返回值和select一样。
epoll
epoll是Linux特有的I/O复用函数。epoll使用一组函数(共三个函数)完成任务,把用户关心的文件描述符上的事件放在内核里的一个事件表中,无需像select和poll那样每次调用都要重复传入文件描述符或事件集。但epoll需要一个额外的文件描述符,来唯一标识内核中的这个事件表。这个文件描述符使用epoll_create函数来创建:
int epoll_create ( int sieze )
size参数只是告诉内核这个epoll对象会处理的事件大致数目,而不是能够处理事件的最大数目。即size参数没有任何作用。
返回值:成功:epoll 专用的文件描述符;失败:-1。
注意:使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
操作epoll的内核事件表的函数:
int epoll_ctl( int epfd, int op, int fd, struct epoll_event *event )
epfd参数即epoll句柄(使用epoll_create函数返回的文件描述符),op参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
fd参数指需要监听的fd,event参数告诉内核需要监听什么事,struct epoll_event结构如下:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
event可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
例如将event设置为 读 和 ET模式 事件的集合:ev.events = EPOLLIN | EPOLLET;
返回值:epoll_ctl成功时返回0,失败返回-1并设置errno。
注意:它不同于 select() 是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
epoll_wait函数。等侍注册在epfd上的socket fd的事件的发生,其原型如下:
int epoll_wait( int epfd, struct epoll_event * events, int maxevents, int timeout )
1)epfd是 epoll的描述符。
2)events则是分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高)。
3)maxevents表示本次可以返回的最大事件数目,通常 maxevents参数与预分配的events数组的大小是相等的。
4)timeout表示在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待。
返回值:该函数成功时返回就绪的文件描述符的个数,失败时返回-1并设置errno。
注意:如果有事件的发生则会将发生的socket fd和事件类型放入到events数组中,并将注册在epfd上的socket fd的事件类型给清空。如果下一个循环还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。
epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和 ET(Edge Trigger,边沿触发)模式。
LT模式是默认的工作模式,这种模式下epoll相当于一个效率较高的poll,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通知此事件,直到此事件被处理。
ET模式下,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait不再向应用程序通知这一事件。ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高。需要往epoll内核事件表中注册一个文件描述符上的EPOLLET事件epoll才能变为ET模式。
注意:每个使用ET模式的文件描述符都应该是非阻塞的。如果是阻塞的,那么读或写操作会因为没有后续的事件而一直处于阻塞状态。
即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发的程序中会导致多个线程(或进程)同时操作一个socket的情况出现。例如一个线程在读取完某个socket上的数据后开始处理这些数据,而在处理这些数据的过程中该socket又有新数据可读(EPOLLIN 再次被触发),此时程序会唤醒另一个线程来读取这些新的数据。这并不是我们期望的,这会使程序的健壮性大大降低而编程的复杂度大大增加。我们期望的是一个socket连接在任意时刻都只被一个线程处理。这就可以使用epoll的 EPOLLONESHOT 事件实现。
对于注册了 EPOLLONESHOT 事件的文件描述符,操作系统最多触发其上注册的一个可读、可写、或者异常事件,且只触发一次。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。但反过来,注册了 EPOLLONESHOT 事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的 EPOLLONESHOT 事件,以确保这个socket下一次可读时,其 EPOLLIN 事件能被触发,让其他工作线程有机会继续处理这个socket。
三组I/O复用函数的比较
系统调用 | 事件集合 | 应用程序索引就绪文件描述符的时间复杂度 | 最大支持文件描述符数 | 工作模式 | 内核实现和工作效率 |
---|---|---|---|---|---|
select | 用户通过3个参数分别传入感兴趣的可读、可写及异常等事件,内核通过对这些参数的在线修改来反馈其中的就绪事件。这使得用户每次调用select都要重置这3个参数 | O(n) | 一般有最大限制 | LT | 采用轮询的方式来检测就绪事件,算法复杂度为O(n) |
poll | 统一处理所有事件类型,因此只需一个事件集参数,用户通过pollfd.events传入感兴趣的事,内核通过修改pollfd.revents反馈其中就绪的事件 | O(n) | 65535 | LT | 采用轮询的方式来检测就绪事件,算法复杂度为O(n) |
epoll | 内核通过一个事件表直接管理用户感兴趣的所有事件。因此每次调用epoll_wait时,无需反复传入用户感兴趣的事件。epoll_wait的参数events仅用来反馈就绪的事件 | O(1) | 65535 | LT 或 ET | 采用回调方式来检测就绪事件,算法复杂度为O(1) |
综上,当监测的fd数量较小,且各个fd都很活跃的情况下,建议使用select和poll;当监听的fd数量较多,且单位时间仅部分fd活跃的情况下,使用epoll会明显提升性能。
多线程编程
创建线程和结束线程
线程相关常用的API如下(在Linux系统上都定义在pthread.h头文件中):
- pthread_create
用于创建一个线程,定义如下:
int pthread_create (pthread_t* thread, const pthread_attr_t* attr, void* (start_routine)( void ), void* arg)
1)thread参数是新线程的标识符,其他线程相关函数通过它来引用新线程。其是一个整形类型,在Linux上几乎所有的资源标识符都是一个整型数,比如socket。
2)attr参数用于设置新线程的属性。给它传 NULL 值时表示使用默认线程属性。
3)start_routine和arg参数分别指定线程将运行的函数及其参数,如果参数不止一个,需要将参数写到一个结构体中,再将该结构体的地址作为参数传入。
返回值:成功时返回0,失败时返回错误码。
注意:
- 线程数量受资源限制是有限的,线程总数不能超过内核参数所定义的值。
- 传入start_routine参数的函数要求为静态函数。
要在静态函数中使用类的动态成员有两种方法:
- 通过类的静态对象来调用
- 将类的对象作为参数传递给该静态函数
- pthread_exit
线程函数在结束时最好调用此函数,以确保安全、干净地退出,因为默认属性的线程执行结束后并不会立即释放占用的资源,直到整个进程执行结束,所有线程的资源以及整个进程占用的资源才会被操作系统回收。其函数原型如下:
void pthread_exit ( void* retval )
此函数通过 retval 参数向线程的回收者传递其退出信息,如果线程不需要返回任何数据,将 retval 参数置为 NULL 即可。
它执行完后不会返回到调用者,而且用于不会失败。
- pthread_join
一个进程中的所有线程都可以调用此函数来回收其他线程(前提是目标线程是可回收的),即等待其他线程结束。其定义如下:
int pthread_join( pthread_t thread, void retval );
thread参数是目标线程的标识符,retval则是目标线程返回的退出信息。该函数会一直阻塞**,直到被回收的线程结束为止。
返回值:成功时返回0,失败则返回错误码。
可能的错误码如下:
(1) EDEADLK:可能引起死锁,例如两个线程互相join等待对方
(2) EINVAL:目标线程不可回收,或者有其他线程正在join等待本线程
(3) ESRCH:线程不存在
- pthread_cancel
可用此函数向另一个线程发送“终止执行”的信号(后续称“Cancel”信号),从而令目标线程结束执行。函数原型如下:
int pthread_cancel(pthread_t pthread)
参数为目标线程的标识符。
返回值:成功返回0,失败则返回错误码。
注意: 函数的功能仅仅是向目标线程发送 Cancel 信号,至于目标线程是否处理该信号以及何时结束执行,由目标线程决定。
接收到取消信号的目标线程可以决定是否允许被取消以及如何取消,这分别由以下两个函数完成(成功时都返回 0):
int pthread_setcancelstate(int state, int *oldstate)
int pthread_setcanceltype(int type, int *oldtype)
这两个参数的第一个参数分别用于设置线程的取消状态(是否允许取消)和取消类型(如何取消),第二个参数则分别记录线程原来的取消状态和取消类型。state参数有两个可选值:
- PTHREAD_CANCEL_ENABLE:允许线程被取消。是线程创建时的默认状态
- PTHREAD_CANCEL_DISABLE:禁止线程被取消。这种情况下的线程收到取消请求,则它会将请求挂机,直到该线程允许被取消。
type参数也有两个可选值:
- PTHREAD_CANCEL_DEFERRED:线程随时都可以被取消。它将使得收到取消请求的目标线程立即采取行动。
- PTHREAD_CANCEL_ASYNCHRONOUS:允许目标线程推迟行动,直到它调用了下面几个所谓的取消点函数中的一个,pthread_join、pthread_testcancel、pthread_cond_wait、pthread_cond_timedwait、sem_wait、sigwait、read、wait等。不过为了安全,最好在可能被取消的代码中调用 pthread_testcancel 函数以设置取消点。
线程结束执行的方式共有 3 种,分别是:
- 线程将指定函数体中的代码执行完后自行结束。
- 线程执行过程中,遇到 pthread_exit() 函数结束执行。
- 线程执行过程中,被同一进程中的其它线程(包括主线程)强制终止。
第一种很容易理解,第二种和第三种方式我们将分别举例给大家演示用法。
pthread_exit() 函数的用法:
#include <stdio.h>
#include <pthread.h>
//线程要执行的函数,arg 用来接收线程传递过来的数据
void *ThreadFun(void *arg)
{
//终止线程的执行,将“https://www.cnblogs.com/zyzhi”返回
pthread_exit("https://www.cnblogs.com/zyzhi"); //返回的字符串存储在常量区,并非当前线程的私有资源
printf("*****************");//此语句不会被线程执行
}
int main()
{
int res;
//创建一个空指针
void * thread_result;
//定义一个表示线程的变量
pthread_t myThread;
res = pthread_create(&myThread, NULL, ThreadFun, NULL);
if (res != 0) {
printf("线程创建失败");
return 0;
}
//等待 myThread 线程执行完成,并用 thread_result 指针接收该线程的返回值
res = pthread_join(myThread, &thread_result);
if (res != 0) {
printf("等待线程失败");
}
printf("%s", (char*)thread_result);
//输出结果为 https://www.cnblogs.com/zyzhi
return 0;
}
第三种方法是指一个线程可以借助 pthread_cancel() 函数向另一个线程发送“终止执行”的信号,从而令目标线程结束执行。对于接收 Cancel 信号后结束执行的目标线程,等同于该线程自己执行如下语句:
pthread_exit(PTHREAD_CANCELED);
PTHREAD_CANCELED是一种宏(定义在<pthread.h>头文件中)
pthread_cancel() 函数的用法:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h> // sleep() 函数
//线程执行的函数
void * thread_Fun(void * arg) {
printf("新建线程开始执行\n");
sleep(10);
}
int main()
{
pthread_t myThread;
void * mess;
int value;
int res;
//创建 myThread 线程
res = pthread_create(&myThread, NULL, thread_Fun, NULL);
if (res != 0) {
printf("线程创建失败\n");
return 0;
}
sleep(1);
//向 myThread 线程发送 Cancel 信号
res = pthread_cancel(myThread);
if (res != 0) {
printf("终止 myThread 线程失败\n");
return 0;
}
//获取已终止线程的返回值
res = pthread_join(myThread, &mess);
if (res != 0) {
printf("等待线程失败\n");
return 0;
}
//如果线程被强制终止,其返回值为 PTHREAD_CANCELED
if (mess == PTHREAD_CANCELED) {
printf("myThread 线程被强制终止\n");
}
else {
printf("error\n");
}
return 0;
}
/*
最后输出:
新建线程开始执行
myThread 线程被强制终止
*/
线程分离
线程分为两种状态:可结合态分离态
- 可结合态(线程默认状态)
在此状态下的线程能够被其他线程回收资源或杀死,在被其他线程回收前,其占有的存储器资源不会释放。
- 分离态
这种状态下的线程不能被其他线程回收或杀死,它的存储器资源在它终止时由系统自动释放。
可以使用线程分离函数将线程变为分离态:
int pthread_detach( pthread_t thread)
返回值:成功时返回0,失败返回-1
POSIX 信号量
多线程程序必须考虑同步问题。pthread_join 可以看作一种简单的线程同步方式,但它无法高效地实现复杂的同步需求,比如控制对共享资源的独占式访问。所以我们需要学习 3 种专门用于线程同步的机制:POSIX信号量、互斥量、条件变量。
常用的 POSIX 信号量函数有以下 5 个,都定义在 semaphore.h 中:
- sem_init
int sem_init( sem_t *sem, int pshared, unsigned int value )
用于初始化一个未命名的信号量
参数:
1)sem:要初始化的信号量
2)pshared:指定信号量的类型,如果为 0,表示这个信号量是当前进程的局部信号量,否则该信号量就可以在多个进程间共享
3)value:指定信号量的初始值
注意:初始化一个已经被初始化的信号量将导致不可预期的结果
- sem_destroy
int sem_destroy( sem_t *sem )
用于销毁一个信号量
注意:销毁一个正在被其他线程等待的信号量将导致不可预期的结果
- sem_wait
int sem_wait( sem_t *sem )
以原子操作的方式将信号量的值 -1
如果信号量的值为 0,则 sem_wait 将被阻塞直到信号量有非 0 值
- sem_trywait
int sem_trywait( sem_t *sem )
以原子操作的方式将信号量的值 -1,它会立即返回(相当于 sem_wait 的非阻塞版本)
信号量为 0 时会返回 -1 并设置 errno 为 EAGAIN
- sem_post
int sem_post( sem_t *sem )
以原子操作的方式将信号量的值 +1
当信号量的值 > 0 时,其他正在调用 sem_wait 等待信号量的线程将被唤醒
这5个函数成功时返回 0,失败则返回-1并设置errno。
互斥量
互斥量(互斥锁)可以保护关键代码,以确保其独占式的访问。
POSIX互斥锁的相关函数主要有如下 5个,都定义在 pthread.h 中:
- pthread_mutex_init
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr)
用于初始化互斥锁。
这些函数的mutex参数都指向要操作的目标互斥锁。mutexattr参数指定互斥锁的属性,为NULL时表示使用默认属性。
还可以使用如下方式来初始化一个互斥锁:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
- pthread_mutex_lock
int pthread_mutex_lock(pthread_mutex_t *mutex)
以原子方式给一个互斥锁加锁。如果目标互斥锁已经被锁上,则将阻塞,直到该互斥锁的占有者将其解锁。
- pthread_mutex_trylock
int pthread_mutex_trylock(pthread_mutex_t *mutex)
与 pthread_mutex_lock 类似(相当于 pthread_mutex_lock 的非阻塞版)。始终立即返回,当目标锁已经被加锁时,将返回错误码EBUSY。
- pthread_mutex_unlock
int pthread_mutex_unlock(pthread_mutex_t *mutex)
以原子方式给一个互斥锁解锁。如果此时有其他线程正在等待这个互斥锁,则这些线程中的某一个将获得它。
- pthread_mutex_destroy
int pthread_mutex_destroy(pthread_mutex_t *mutex)
用于销毁互斥锁,以释放其占用的内核资源。销毁一个已经加锁的互斥锁将导致不可预期的后果。
上面这些函数成功时返回 0,失败则返回错误码。
条件变量
如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于在线程之间同步共享数据的值。假设一个进程中包含多个线程,这些线程共享变量 x,我们希望某个(或某些)线程等待 “x==10” 条件成立后再执行后续的代码,就可以使用条件变量来实现。
条件变量提供了一种通知机制:当某个共享数据达到某个值的时候,唤醒等待这个共享数据的线程。
为了避免多线程之间发生“抢夺资源”的问题,条件变量在使用过程中必须和一个互斥锁搭配使用。
条件变量用 pthread_cond_t 类型的变量表示,条件变量的相关函数主要有以下几个,都定义在 pthread.h 中:
- 初始化条件变量
int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr)
参数 cond 用于指明要初始化的条件变量;参数 attr 用于自定义条件变量的属性,通常我们将它赋值为 NULL,表示以系统默认的属性完成初始化操作。
当使用默认属性去初始化时,也可以用如下方法完成初始化:
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER
- 阻塞当前线程,等待条件成立
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex)
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, const struct timespec* abstime)
cond 参数表示已初始化好的条件变量;mutex 参数表示与条件变量配合使用的互斥锁;abstime 参数表示阻塞线程的时间。
注意:abstime 参数指的是绝对时间,如果要阻塞线程 5 秒钟,就需要用获得的当前系统的时间去加上 5 秒,最终得到的时间才是传递的实参值。
调用两个函数之前,我们必须先创建好一个互斥锁并完成 加锁 操作,然后才能作为实参传递给 mutex 参数。两个函数会完成以下两项工作:
- 阻塞线程,直至接收到“条件成立”的信号
- 当线程被添加到等待队列上时,将互斥锁 解锁
注意:当函数接收到“条件成立”的信号后,它并不会立即结束对线程的阻塞,而是先完成对互斥锁的“加锁”操作,然后才解除阻塞。
两个函数的区别在于:
- pthread_cond_wait() 函数可以永久阻塞线程,直到条件变量成立的那一刻
- pthread_cond_timedwait() 函数只能在 abstime 参数指定的时间内阻塞线程,超出时限后,该函数将重新对互斥锁执行“加锁”操作,并解除对线程的阻塞,函数的返回值为 ETIMEDOUT。
- 解除线程的阻塞状态
int pthread_cond_signal(pthread_cond_t* cond)
int pthread_cond_broadcast(pthread_cond_t* cond)
cond 参数表示初始化好的条件变量
两个函数都能解除线程的“被阻塞”状态,区别在于:
- pthread_cond_signal() 函数至少解除一个线程的“被阻塞”状态,如果等待队列中包含多个线程,优先解除哪个线程将由操作系统的线程调度程序决定
- pthread_cond_broadcast() 函数可以解除等待队列中所有线程的“被阻塞”状态。
由于互斥锁的存在,解除阻塞后的线程也不一定能立即执行。当互斥锁处于“加锁”状态时,解除阻塞状态的所有线程会组成等待互斥锁资源的队列,等待互斥锁“解锁”。
- 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond)
cond 参数表示要销毁的条件变量
注意:销毁后的条件变量还可以调用 pthread_cond_init() 函数重新初始化后使用。
以上函数成功时都返回0,失败则返回错误码。
线程同步机制包装成类
为了充分复用代码,将上面的 3 种线程同步机制分别封装成 3 个类,实现在 locker.h 头文件中。
#ifndef LOCKER_H
#define LOCKER_H
#include <exception>
#include <pthread.h>
#include <semaphore.h>
class sem
{
public:
sem()
{
if (sem_init(&m_sem, 0, 0) != 0)
{
throw std::exception();
}
}
sem(int num)
{
if (sem_init(&m_sem, 0, num) != 0)
{
throw std::exception();
}
}
~sem()
{
sem_destroy(&m_sem);
}
bool wait()
{
return sem_wait(&m_sem) == 0;
}
bool post()
{
return sem_post(&m_sem) == 0;
}
private:
sem_t m_sem;
};
class locker
{
public:
locker()
{
if (pthread_mutex_init(&m_mutex, NULL) != 0)
{
throw std::exception();
}
}
~locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool lock()
{
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock()
{
return pthread_mutex_unlock(&m_mutex) == 0;
}
pthread_mutex_t *get()
{
return &m_mutex;
}
private:
pthread_mutex_t m_mutex;
};
class cond
{
public:
cond()
{
if (pthread_cond_init(&m_cond, NULL) != 0)
{
//pthread_mutex_destroy(&m_mutex);
throw std::exception();
}
}
~cond()
{
pthread_cond_destroy(&m_cond);
}
bool wait(pthread_mutex_t *m_mutex)
{
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_wait(&m_cond, m_mutex);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool timewait(pthread_mutex_t *m_mutex, struct timespec t)
{
int ret = 0;
//pthread_mutex_lock(&m_mutex);
ret = pthread_cond_timedwait(&m_cond, m_mutex, &t);
//pthread_mutex_unlock(&m_mutex);
return ret == 0;
}
bool signal()
{
return pthread_cond_signal(&m_cond) == 0;
}
bool broadcast()
{
return pthread_cond_broadcast(&m_cond) == 0;
}
private:
//static pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
#endif
线程池
线程池一种线程使用模式。线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池的组成部分有:
- 线程池管理器:创建和初始化线程,启动和停止线程,调配任务;管理线程池
- 工作线程:线程池中的线程
- 任务接口:添加任务的接口,以提供工作线程调度任务的执行。
- 任务队列:用于存放没有处理的任务,提供一种缓冲机制,同时具有调度功能,高优先级的任务放在队列前面
线程池中线程数量
线程池中的线程数量最直接的限制因素是中央处理器(CPU)的处理器(processors/cores)的数量N:如果你的CPU是4-cores的,对于CPU密集型的任务(如视频剪辑等消耗CPU计算资源的任务)来说,那线程池中的线程数量最好也设置为4(或者+1防止其他因素造成的线程阻塞);对于IO密集型的任务,一般要多于CPU的核数,因为线程间竞争的不是CPU的计算资源而是IO,IO的处理一般较慢,多于cores数的线程将为CPU争取更多的任务,不至在线程处理IO的过程造成CPU空闲导致资源浪费。
公式:最佳线程数 = CPU当前可使用的Cores数 * 当前CPU的利用率 * (1 + CPU等待时间 / CPU处理时间)
本项目采用的是半同步/半反应堆线程池,将线程池代码封装在 threadpool.h 头文件中
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
/* 引用上面的线程同步机制的包装类 */
#include "../lock/locker.h"
/* 线程池类,定义为模板方便复用 */
template <typename T>
class threadpool
{
public:
/* thread_number是线程池中线程数量,max_request是请求队列中最多允许的、等待处理的请求数量 */
threadpool(connection_pool *connPool, int thread_number = 8, int max_request = 10000);
~threadpool();
/* 往请求队列中添加任务 */
bool append(T *request);
private:
/* 工作线程运行的函数,它不断从工作队列中取出任务并执行 */
static void *worker(void *arg);
void run();
private:
int m_thread_number; // 线程池中的线程数
int m_max_requests; // 请求队列中允许的最大请求数
pthread_t *m_threads; // 描述线程池的数组,其大小为 m_thread_number
std::list<T *> m_workqueue; // 请求队列
locker m_queuelocker; // 保护请求队列的互斥锁
sem m_queuestat; // 是否有任务需要处理
bool m_stop; // 是否结束线程
connection_pool *m_connPool; //数据库
};
template <typename T>
threadpool<T>::threadpool( connection_pool *connPool, int thread_number, int max_requests) : m_thread_number(thread_number), m_max_requests(max_requests), m_stop(false), m_threads(NULL),m_connPool(connPool)
{
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
for (int i = 0; i < thread_number; ++i)
{
//printf("create the %dth thread\n",i);
/* 因为需要在静态函数中使用类的动态成员,故将类的对象作为参数闯入 */
if (pthread_create(m_threads + i, NULL, worker, this) != 0)
{
delete[] m_threads;
throw std::exception();
}
/* 将线程设置为分离态 */
if (pthread_detach(m_threads[i]))
{
delete[] m_threads;
throw std::exception();
}
}
}
template <typename T>
threadpool<T>::~threadpool()
{
delete[] m_threads;
m_stop = true;
}
template <typename T>
bool threadpool<T>::append(T *request)
{
/* 操作工作队列时需要加锁,因为它被所以线程共享 */
m_queuelocker.lock();
if (m_workqueue.size() > m_max_requests)
{
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}
template <typename T>
void *threadpool<T>::worker(void *arg)
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template <typename T>
void threadpool<T>::run()
{
while (!m_stop)
{
m_queuestat.wait();
m_queuelocker.lock();
if (m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
T *request = m_workqueue.front();
m_workqueue.pop_front();
m_queuelocker.unlock();
if (!request)
continue;
//从连接池中取出一个数据库连接
request->mysql = m_connPool->GetConnection();
//process(模板类中的方法,这里是http类)进行处理
request->process();
//将数据库连接放回连接池
m_connPool->ReleaseConnection(request->mysql);
}
}
#endif