Linux C 简单版线程池 + epoll实现服务器中遇到的线程阻塞的问题

编程入门 行业动态 更新时间:2024-10-09 13:25:33

Linux C 简单版<a href=https://www.elefans.com/category/jswz/34/1771240.html style=线程池 + epoll实现服务器中遇到的线程阻塞的问题"/>

Linux C 简单版线程池 + epoll实现服务器中遇到的线程阻塞的问题

最近在学习线程池方面的内容,epoll与线程池结合实现服务器。
然后遇到了一个线程阻塞的问题
问题描述:
当一个客户端接入服务器后,发送数据,服务端能够打印出数据并回射给客户端。但在连其他客户端后,其他客户端在发送数据后服务器就不能打印数据并回射了。甚至直接退出。
下面是代码:
threadpoolsimple.h

#ifndef _THREADPOOL_H
#define _THREADPOOL_H#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "sys/epoll.h"
#include "wrap.h"typedef struct _PoolTask                   //任务结构体
{int tasknum;//模拟任务编号void *arg;//回调函数参数void (*task_func)(void *arg);//任务的回调函数int fd;int epfd;struct epoll_event *evs;//当前任务对应文件描述符的事件结构体}PoolTask ;typedef struct _ThreadPool                //线程池结构体
{int max_job_num;//最大任务个数int job_num;//实际任务个数PoolTask *tasks;//任务队列数组首地址int job_push;//入队位置int job_pop;// 出队位置int thr_num;//线程池内线程个数pthread_t *threads;//线程池内线程数组,存放线程int shutdown;//是否关闭线程池pthread_mutex_t pool_lock;//线程池的锁pthread_cond_t empty_task;//任务队列为空的条件  条件变量pthread_cond_t not_empty_task;//任务队列不为空的条件  条件变量}ThreadPool;void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
//void addtask(ThreadPool *pool);//添加任务到线程池
void addtask(ThreadPool *pool,int fd,struct epoll_event *evs);//
void taskRun(void *arg);//任务回调函数#endif

threadpoolsimple.c

//简易版线程池
#include "threadpoolsimple.h"ThreadPool *thrPool = NULL;int beginnum = 1000;void *thrRun(void *arg)
{//printf("begin call %s-----\n",__FUNCTION__);ThreadPool *pool = (ThreadPool*)arg;int taskpos = 0;//任务位置PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));while(1){//获取任务,先要尝试加锁pthread_mutex_lock(&thrPool->pool_lock);//无任务并且线程池不是要摧毁while(thrPool->job_num <= 0 && !thrPool->shutdown )//如果线程池中没有任务就循环等待有任务{//如果没有任务,线程会阻塞pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);}if(thrPool->job_num){//有任务需要处理taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//taskpos表示当前处理的任务在任务队列的下标,thrPool->job_pop相当于队首,处理一个任务,队首就加1。//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());//为什么要拷贝?避免任务被修改,生产者会添加任务,因为是在释放锁之后才执行任务的回调函数,在释放锁之后,有可能当前的tasks[taskpos]被添加者修改了,所以要将其拷贝给一个中间变量task,好让我们安全的执行任务。memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));task->arg = task;thrPool->job_num--;task = &thrPool->tasks[taskpos];pthread_cond_signal(&thrPool->empty_task);//通知生产者,因为已经处理任务了,所以告诉生产者现在任务队列有空闲单元}if(thrPool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(&thrPool->pool_lock);free(task);pthread_exit(NULL);}//释放锁printf("执行任务回调\n");pthread_mutex_unlock(&thrPool->pool_lock);// printf("001\n");task->task_func(task->arg);//执行回调函数// printf("002\n");}//printf("end call %s-----\n",__FUNCTION__);
}//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{printf("begin call %s-----\n",__FUNCTION__);thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));thrPool->thr_num = thrnum;    //由main函数中指定的线程池中的线程个数这里是3thrPool->max_job_num = maxtasknum;//最大任务数量 = main函数中给定的参数maxtasknum,这里是20thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁thrPool->job_push = 0;//任务队列添加的位置thrPool->job_pop = 0;//任务队列出队的位置thrPool->job_num = 0;//初始化的任务个数为0thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列//初始化锁和条件变量pthread_mutex_init(&thrPool->pool_lock,NULL);pthread_cond_init(&thrPool->empty_task,NULL);pthread_cond_init(&thrPool->not_empty_task,NULL);int i = 0;thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for(i = 0;i < thrnum;i++){pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程}//printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{pool->shutdown = 1;//开始自爆pthread_cond_broadcast(&pool->not_empty_task);//诱杀 ,发出任务队列并不是空的,有任务待取。诱使所有阻塞在条件变量的线程过来抢锁,抢到锁后 线程函数中的pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock)函数返回,继续while判断,此时由于 pool->shutdown ==1,并且thrPool->shutdown==0,所以会执行/* if(thrPool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//使当前线程分离,无需主线程回收资源,pthread_self()获取当前线程号,pthread_mutex_unlock(&thrPool->pool_lock);//解锁free(task);//释放中间变量taskpthread_exit(NULL);//退出线程}这部分代码,*/pthread_cond_destroy(&pool->not_empty_task);  //销毁条件变量1pthread_cond_destroy(&pool->empty_task);   //销毁条件变量2pthread_mutex_destroy(&pool->pool_lock);  //销毁互斥锁free(pool->tasks);      //释放任务队列free(pool->threads);   //释放线程数组free(pool);          //释放线程池
}//添加任务到线程池
void addtask(ThreadPool *pool,int fd,struct epoll_event *evs)
{printf("begin call %s-----\n",__FUNCTION__);pthread_mutex_lock(&pool->pool_lock);//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)while(pool->max_job_num <= pool->job_num){pthread_cond_wait(&pool->empty_task,&pool->pool_lock);}int taskpos = (pool->job_push++)%pool->max_job_num; //taskpos是task队列数组中当前添加的任务的下标,而job_push相当于队列的队尾,比如当第一个task被添加时,(pool->job_push++)%pool->max_job_num 即 先让pool->job_push++取模pool->max_job_num即0%20 =0,所以taskpos = 0;然后pool->job_push再++,即队尾变成了1,当pool->job_push加到20时,再次取模taskpos = 20%20=0,即任务又从第0个开始//printf("add task %d  tasknum===%d\n",taskpos,beginnum);pool->tasks[taskpos].tasknum = beginnum++;//数组第0个元素其实就是第一个任务(也可以说成是第0个任务),将第一个任务的编号定为1000,beginnum在上面被定义为1000;然后beginnum再++ ,变为1001。pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];//指定当前任务的回调函数的参数为当前这个任务的地址pool->tasks[taskpos].task_func = taskRun;//指定当前任务的回调函数pool->tasks[taskpos].fd = fd; //指定当前任务的文件描述符pool->tasks[taskpos].evs = evs;  //指定当前任务对应文件描述符的事件结构体的指针。pool->job_num++;//线程池中当前实际的任务的数量加1pthread_mutex_unlock(&pool->pool_lock);pthread_cond_signal(&pool->not_empty_task);//通知包身工printf("end call %s-----\n",__FUNCTION__);
}//任务回调函数
void taskRun(void *arg)
{//printf("003\n");PoolTask *task = (PoolTask*)arg;char buf[1024]="";int n = Read(task->fd , buf,sizeof(buf));if(n == 0 ){close(task->fd);//关闭cfdepoll_ctl(task->epfd,EPOLL_CTL_DEL,task->fd,task->evs);//将cfd下树printf("client close\n");}else if(n> 0){printf("%s\n",buf );Write(task->fd ,buf,n);}//printf("004\n");}int main()
{create_threadpool(3,20);int i = 0;//创建套接字,绑定int lfd = tcp4bind(8000,NULL);//监听listen(lfd,128);//创建树int epfd = epoll_create(1);struct epoll_event ev,evs[1024];ev.data.fd = lfd;ev.events = EPOLLIN;//监听读事件//将ev上树epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev);while(1){int nready = epoll_wait(epfd,evs,1024,-1);if(nready < 0)perr_exit("err");else if(nready == 0)continue;else if(nready > 0 ){for(int i=0;i<nready;i++){if(evs[i].data.fd == lfd && evs[i].events & EPOLLIN)//如果是lfd变化,并且是读事件{struct sockaddr_in cliaddr;char buf_ip[16]="";socklen_t len  = sizeof(cliaddr);int cfd = Accept(lfd,(struct sockaddr *)&cliaddr,&len);printf("client ip=%s port=%d\n",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,buf_ip,sizeof(buf_ip)),ntohs(cliaddr.sin_port));struct epoll_event ev1;ev1.data.fd = cfd;//cfd上树ev1.events = EPOLLIN;//监听读事件epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev1);//将cfd上树}else if(evs[i].events & EPOLLIN)//普通的读事件{addtask(thrPool,evs[i].data.fd,&evs[i]);}}}}close(lfd);destroy_threadpool(thrPool);return 0;
}


如图,在第一个客户端发送444444后得到了回应,addtask被调用了5次。但2,3两个客户端发送数据后直接退出了。

再看,这一次直接调用了很多次addtask。
既然addtask被调用这么多次,那么是谁在调用addtask呢?
发现调用addtask是在main函数中的,即下面这段代码

  while(1){int nready = epoll_wait(epfd,evs,1024,-1);if(nready < 0)perr_exit("err");else if(nready == 0)continue;else if(nready > 0 ){for(int i=0;i<nready;i++){if(evs[i].data.fd == lfd && evs[i].events & EPOLLIN)//如果是lfd变化,并且是读事件{struct sockaddr_in cliaddr;char buf_ip[16]="";socklen_t len  = sizeof(cliaddr);int cfd = Accept(lfd,(struct sockaddr *)&cliaddr,&len);printf("client ip=%s port=%d\n",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,buf_ip,sizeof(buf_ip)),ntohs(cliaddr.sin_port));struct epoll_event ev1;ev1.data.fd = cfd;//cfd上树ev1.events = EPOLLIN;//监听读事件epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev1);//将cfd上树}else if(evs[i].events & EPOLLIN)//普通的读事件{addtask(thrPool,evs[i].data.fd,&evs[i]);}}

说明这个while循环执行了很多次。但是我只在往一个客户端发过一次数据,那么该客户端的文件描述符不是应该只发生一次变化吗,那么何来这么多次的addtask的调用呢?

于是,我猜想是不是epoll_wait的原因
经查阅资料,发现:
epoll在内核中会创建一个链表list,当有文件描述符发生变化,就会把该文件描述符添加到这个list中,epoll_wait发现list不为空了,就会被唤醒并返回文件描述发生变化的个数,然后执行后面的代码。这个过程中epoll会把list对应文件描述符的事件驱动结构体拷贝到events[]数组中,然后再清空list。

那么会不会是epoll_wait函数下一次调用时,list还没被清空?
但我不知道怎么查看list是否被清空,所以我在while循环中加了一个sleep(1),即每返回一次epoll_wait,就延迟1秒钟,假设是这个原因,那么估计1秒钟后list会被清空。

 while(1){sleep(1);//加了一个sleepint nready = epoll_wait(epfd,evs,1024,-1);if(nready < 0)perr_exit("err");else if(nready == 0)continue;else if(nready > 0 ){for(int i=0;i<nready;i++){if(evs[i].data.fd == lfd && evs[i].events & EPOLLIN)//如果是lfd变化,并且是读事件{struct sockaddr_in cliaddr;char buf_ip[16]="";socklen_t len  = sizeof(cliaddr);int cfd = Accept(lfd,(struct sockaddr *)&cliaddr,&len);printf("client ip=%s port=%d\n",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,buf_ip,sizeof(buf_ip)),ntohs(cliaddr.sin_port));struct epoll_event ev1;ev1.data.fd = cfd;//cfd上树ev1.events = EPOLLIN;//监听读事件epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev1);//将cfd上树}else if(evs[i].events & EPOLLIN)//普通的读事件{addtask(thrPool,evs[i].data.fd,&evs[i]);}}

效果如下图

如图,居然成功了,三个客户端发送的数据都能被显示和回射。并且addtask每次只执行一次。
开森!!!!
然后思考了一下,应该是epoll_wait返回时,内核中有一些操作还没完全做完。
而while循环在执行到下一次epoll_wait时,list还没被清空,导致epoll_wait认为list仍然不为空,于是继续返回,events[]中的内容仍为上一次的内容,导致再次执行addtask。经测试,我这里一次大概执行了几十次addtask ,而实际上我只往服务端写了一次数据。这样我总共设定的任务队列只有20个任务,但它一下给我添了很多任务,甚至添满了。这样如果是一个客户端还好,因为当前服务器只需对一个文件描述符进行处理,那么线程函数处理一个任务,任务队列就会空一个位置,虽然addtask添加很多,但只要该缓冲区有数据就不影响执行;但是如果多个客户端连接了,比如第一个客户端发送了一条数据,addtask被调用多次,添了很多任务,这些任务都是第一个客户端的任务。那么此时一个线程执行任务后,空出一个任务单元,立马等待的addtask就添加进去。此时,缓冲区没有数据,read会阻塞。而你在另一个客户端发数据,又调用了多次addtask,线程执行的是第一个客户端的任务,并且在阻塞,根本没读你现在这个客户端的缓冲区,于是你发现服务器对你当前这个客户端没反应。只有等第一个客户端的发送的任务全被执行了或者直接关闭了第一个客户端的文件描述符,才能执行你现在当前客户端的任务。。

最后,作为小白的我并没有找到更好的办法来防止这种情况的发生,毕竟睡1秒对于计算机来说是很长的时间了;希望有大佬能够指点一二。

更多推荐

Linux C 简单版线程池 + epoll实现服务器中遇到的线程阻塞的问题

本文发布于:2024-02-13 11:01:46,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1758178.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:线程   器中   简单   Linux   epoll

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!