Linux网络编程(六)

编程入门 行业动态 更新时间:2024-10-06 01:43:31

Linux<a href=https://www.elefans.com/category/jswz/34/1768814.html style=网络编程(六)"/>

Linux网络编程(六)

 一、epoll监听socket实现高并发服务器【水平触发(默认)】

#include <stdio.h>
#include <fcntl.h>
#include "wrap.h"
#include <sys/epoll.h>
int main(int argc, char *argv[])
{//创建套接字 绑定int lfd = tcp4bind(8000,NULL);//监听Listen(lfd,128);//创建树int epfd = epoll_create(1);//将lfd上树struct epoll_event ev,evs[1024];ev.data.fd = lfd;ev.events = EPOLLIN;epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev);//while监听while(1){int nready = epoll_wait(epfd,evs,1024,-1);//监听printf("epoll wait _________________\n");if(nready <0)	// 出错了{perror("");break;}else if( nready == 0)	// 没有文件描述符变化{continue;}else	//有文件描述符变化{for(int i=0;i<nready;i++) // nready为变化了的文件描述符数量,变化了的文件描述符保存在了evs数组中{//判断lfd变化,并且是“读”事件变化(变化的文件描述符也有可能是写事件)if(evs[i].events & EPOLLIN && evs[i].data.fd == lfd){struct sockaddr_in cliaddr;char ip[16]="";socklen_t len = sizeof(cliaddr);//提取新的连接int cfd = Accept(lfd,(struct sockaddr *)&cliaddr,&len);printf("new client ip=%s port =%d\n",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,ip,16), ntohs(cliaddr.sin_port));//将cfd上树ev.data.fd =cfd; // 可以重复使用之前定义的evev.events =EPOLLIN;epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev);}else if(evs[i].events & EPOLLIN)//cfd 变化 ,而且是读事件变化{char buf[4]="";int n = read(evs[i].data.fd,buf,sizeof(buf));if(n < 0)//出错,cfd下树{//普通错误perror("");close(evs[i].data.fd);//将cfd关闭epoll_ctl(epfd,EPOLL_CTL_DEL,evs[i].data.fd,&evs[i]);break;}else if(n == 0)//客户端关闭 ,{printf("client close\n");close(evs[i].data.fd);//将cfd关闭epoll_ctl(epfd,EPOLL_CTL_DEL,evs[i].data.fd,&evs[i]);//下树break;}else{//	printf("%s\n",buf);write(STDOUT_FILENO,buf,4);write(evs[i].data.fd,buf,n);}}}}}return 0;
}

二、epoll + 线程池

threadpoolsimple.h

#ifndef _THREADPOOL_H
#define _THREADPOOL_H#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "sys/epoll.h"
#include "wrap.h"typedef struct _PoolTask
{int tasknum;//模拟任务编号void *arg;//回调函数参数void (*task_func)(void *arg);//任务的回调函数int fd;int epfd;struct epoll_event *evs;}PoolTask ;typedef struct _ThreadPool
{int max_job_num;//最大任务个数int job_num;//实际任务个数PoolTask *tasks;//任务队列数组int job_push;//入队位置int job_pop;// 出队位置int thr_num;//线程池内线程个数pthread_t *threads;//线程池内线程数组int shutdown;//是否关闭线程池pthread_mutex_t pool_lock;//线程池的锁pthread_cond_t empty_task;//任务队列为空的条件pthread_cond_t not_empty_task;//任务队列不为空的条件}ThreadPool;void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum  代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
//void addtask(ThreadPool *pool);//添加任务到线程池
void addtask(ThreadPool *pool,int fd,struct epoll_event *evs);//
void taskRun(void *arg);//任务回调函数#endif

threadpoolsimple.c

//简易版线程池
#include "threadpoolsimple.h"ThreadPool *thrPool = NULL;int beginnum = 1000;void *thrRun(void *arg)
{//printf("begin call %s-----\n",__FUNCTION__);ThreadPool *pool = (ThreadPool*)arg;int taskpos = 0;//任务位置PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));while(1){//获取任务,先要尝试加锁pthread_mutex_lock(&thrPool->pool_lock);//无任务并且线程池不是要摧毁while(thrPool->job_num <= 0 && !thrPool->shutdown ){//如果没有任务,线程会阻塞pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);}if(thrPool->job_num){//有任务需要处理taskpos = (thrPool->job_pop++)%thrPool->max_job_num;//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());//为什么要拷贝?避免任务被修改,生产者会添加任务memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));task->arg = task;thrPool->job_num--;//task = &thrPool->tasks[taskpos];pthread_cond_signal(&thrPool->empty_task);//通知生产者}if(thrPool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(&thrPool->pool_lock);free(task);pthread_exit(NULL);}//释放锁pthread_mutex_unlock(&thrPool->pool_lock);printf("001\n");task->task_func(task->arg);//执行回调函数printf("002\n");}//printf("end call %s-----\n",__FUNCTION__);
}//创建线程池
void create_threadpool(int thrnum,int maxtasknum)
{printf("begin call %s-----\n",__FUNCTION__);thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));thrPool->thr_num = thrnum;thrPool->max_job_num = maxtasknum;thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁thrPool->job_push = 0;//任务队列添加的位置thrPool->job_pop = 0;//任务队列出队的位置thrPool->job_num = 0;//初始化的任务个数为0thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列//初始化锁和条件变量pthread_mutex_init(&thrPool->pool_lock,NULL);pthread_cond_init(&thrPool->empty_task,NULL);pthread_cond_init(&thrPool->not_empty_task,NULL);int i = 0;thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for(i = 0;i < thrnum;i++){pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程}//printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{pool->shutdown = 1;//开始自爆pthread_cond_broadcast(&pool->not_empty_task);//诱杀 int i = 0;for(i = 0; i < pool->thr_num ; i++){pthread_join(pool->threads[i],NULL);}pthread_cond_destroy(&pool->not_empty_task);pthread_cond_destroy(&pool->empty_task);pthread_mutex_destroy(&pool->pool_lock);free(pool->tasks);free(pool->threads);free(pool);
}//添加任务到线程池
void addtask(ThreadPool *pool,int fd,struct epoll_event *evs)
{//printf("begin call %s-----\n",__FUNCTION__);pthread_mutex_lock(&pool->pool_lock);//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)while(pool->max_job_num <= pool->job_num){pthread_cond_wait(&pool->empty_task,&pool->pool_lock);}int taskpos = (pool->job_push++)%pool->max_job_num;//printf("add task %d  tasknum===%d\n",taskpos,beginnum);pool->tasks[taskpos].tasknum = beginnum++;pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];pool->tasks[taskpos].task_func = taskRun;pool->tasks[taskpos].fd = fd;pool->tasks[taskpos].evs = evs;pool->job_num++;pthread_mutex_unlock(&pool->pool_lock);pthread_cond_signal(&pool->not_empty_task);//通知包身工//printf("end call %s-----\n",__FUNCTION__);
}//任务回调函数
void taskRun(void *arg)
{printf("003\n");PoolTask *task = (PoolTask*)arg;char buf[1024]="";int n = Read(task->fd , buf,sizeof(buf));if(n == 0 ){close(task->fd);//关闭cfdepoll_ctl(task->epfd,EPOLL_CTL_DEL,task->fd,task->evs);//将cfd上树printf("client close\n");}else if(n> 0){printf("%s\n",buf );Write(task->fd ,buf,n);}printf("004\n");}int main()
{create_threadpool(3,20);int i = 0;//创建套接字,绑定int lfd = tcp4bind(8000,NULL);//监听listen(lfd,128);//创建树int epfd = epoll_create(1);struct epoll_event ev,evs[1024];ev.data.fd = lfd;ev.events = EPOLLIN;//监听读事件//将ev上树epoll_ctl(epfd,EPOLL_CTL_ADD,lfd,&ev);while(1){int nready = epoll_wait(epfd,evs,1024,-1);if(nready < 0)perr_exit("err");else if(nready == 0)continue;else if(nready > 0 ){for(int i=0;i<nready;i++){if(evs[i].data.fd == lfd && evs[i].events & EPOLLIN)//如果是lfd变化,并且是读事件{struct sockaddr_in cliaddr;char buf_ip[16]="";socklen_t len  = sizeof(cliaddr);int cfd = Accept(lfd,(struct sockaddr *)&cliaddr,&len);printf("client ip=%s port=%d\n",inet_ntop(AF_INET,&cliaddr.sin_addr.s_addr,buf_ip,sizeof(buf_ip)),ntohs(cliaddr.sin_port));ev.data.fd = cfd;//cfd上树ev.events = EPOLLIN;//监听读事件epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&ev);//将cfd上树}else if(evs[i].events & EPOLLIN)//普通的读事件{printf("###########1\n");addtask(thrPool,evs[i].data.fd,&evs[i]);printf("###########2\n");// char buf[1024]="";// int n = Read(evs[i].data.fd , buf,sizeof(buf));// if(n <= 0 )// {//     close(evs[i].data.fd);//关闭cfd//     epoll_ctl(epfd,EPOLL_CTL_DEL,evs[i].data.fd,&evs[i]);//将cfd上树//     printf("client close\n");// }// else// {//     printf("%s\n",buf );//     Write(evs[i].data.fd ,buf,n);// }}}}}close(lfd);destroy_threadpool(thrPool);return 0;
}

更多推荐

Linux网络编程(六)

本文发布于:2024-02-13 11:00:57,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1758066.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:网络编程   Linux

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!