【IO多路转接】Epoll

编程入门 行业动态 更新时间:2024-10-13 22:19:31

【IO<a href=https://www.elefans.com/category/jswz/34/1768459.html style=多路转接】Epoll"/>

【IO多路转接】Epoll

当文件描述符数量过多时,Select和Poll都会表现出性能偏低的问题,因此Epoll在Poll的基础上做出了改进
按照man手册的说法: 是为处理大批量句柄而作了改进的poll.
它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44)
它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法.

Epoll

  • Epoll相关系统调用
    • epoll_create
    • epoll_ctl
      • 关于epoll_event
    • epoll_wait
  • Epoll工作原理
    • 整个Epoll过程
    • Epoll常见程序片段
  • Epoll的优点
  • Epoll的两种工作方式
    • 水平触发(Level Triggered)
    • 边缘触发(Edge Triggered)
  • 将fd设置为非阻塞IO
    • fcntl函数原型
    • 实现SetNonBlock
  • 对比LT和ET
  • 实现简易版Epoll服务器(LT/ET)
    • LT模式
    • ET模式

Epoll相关系统调用

epoll_create

#include <sys/epoll.h>int epoll_create(int size);

创建一个epoll句柄

  • size参数可被忽略
  • 必须调用close进行关闭

epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 不同于select在监听时告诉内核事件类型,而是先要注册要监听的事件类型,即用户要告诉内核需要关心哪个文件描述符上的哪个事件
  • epfd为epoll_create的返回值(epoll句柄)
  • op为对fd的操作方法
  • fd为要监听的文件描述符
  • event为要监听fd的事件类型

关于op参数

  • EPOLL_CTL_ADD:注册新的fd到epfd中
  • EPOLL_CTL_MOD:修改已注册fd中的监听事件
  • EPOLL_CTL_DEL:从epfd中删除fd

关于epoll_event

typedef union epoll_data
{void        *ptr;int          fd;uint32_t     u32;uint64_t     u64;
} epoll_data_t;struct epoll_event
{uint32_t     events;      /* Epoll events */epoll_data_t data;        /* User data variable */
};
  • epoll_event中包含了events和data,其中events即为要监听事件的类型,data为用户数据,内核不会对用户数据做修改
  • events可以是以下几个宏的集合

EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里

epoll_wait

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  • epoll_wait用来收集epoll中已就绪的事件
  • epoll将已就绪的事件发送到events数组中,这个events需要用户自己管理,内核只负责把数据复制到数组中,而不对数组大小做管理,因此events不能为空指针
  • maxevents为events的大小
  • timeout为超时时间,单位为毫秒(ms)
  • epoll_wait成功返回已就绪文件描述符的个数,返回0表示等待超时,返回小于0表示函数等待失败

Epoll工作原理

Epoll会管理一棵红黑树,红黑树的每一个节点包含了fd,events以及相关链接属性等字段,在当底层通知OS事件就绪时,该“就绪节点”会调用自己的回调机制,将自己链接到就绪队列中,最后内核再将就绪队列中的数据拷贝给用户。

整个Epoll过程

  1. epoll_create()创建epoll
  2. epoll_ctl()注册事件
  3. epoll_wait()等待事件就绪

Epoll常见程序片段

int epfd = epoll_create(size);struct epoll_event ev;
ev.data.fd = sock;
ev.events = EPOLLIN;//以读事件为例epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);//注册该事件struct epoll_event revs[num];
int n = epoll_wait(epfd, revs, num, timeout);for (int i = 0; i < n; ++i)
{int fd = revs[i].data.fd;uint32_t events = revs[i].events;//TODO
}
epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);
close(epfd);

Epoll的优点

  • 接口方便,不需要循环设置文件描述符,管理事件
  • 不需要反复拷贝,在epoll_ctl中的EPOLL_CTL_ADD操作并不频繁,只需要在最开始时将文件描述符拷贝到内核,最后通过EPOLL_CTL_DEL删除即可,而select和poll在每次使用时都需要循环拷贝文件描述符,增大了开销
  • 底层红黑树插入效率高(lgN),而select和poll插入效率都为O(N)
  • 事件回调机制,能够在事件就绪时直接将节点插入到就绪序列,也能避免重复拷贝,即使文件描述符数量过多也不会影响效率
  • 没有文件描述符数量的限制

Epoll的两种工作方式

为了介绍两种工作方式我就举个栗子
一天你正在打游戏,这个时候你麻麻喊你吃饭
一种情况是你麻麻喊了一遍,你没有反应,于是你麻麻喊了第二遍,第三遍…直到你去吃饭,这就是水平触发
另一种情况是你麻麻喊了一遍,你没动,至此之后,你麻麻不管你了再也没喊过你,这就是边缘触发

水平触发(Level Triggered)

epoll默认状态下就是水平触发模式

  • 当epoll检测到有事件就绪时,可以进行处理也可以不处理,又或者处理一部分
  • 如果不处理或处理一部分,下次调用epoll_wait时epoll会继续通知该事件处于就绪状态
  • 直到缓冲区中的数据被读完,epoll_wait不返回
  • 读写支持阻塞读写和非阻塞读写

边缘触发(Edge Triggered)

如果将事件中添加EPOLLET属性,那么epoll的模式会变为ET工作模式

  • 当epoll检测到事件就绪时,必须立刻处理
  • 如果只处理一部分或者不处理,下次调用epoll_wait时则不会通知该事件处于就绪状态
  • 也就是说在ET模式下,处理事件的机会只有一次
  • ET的性能要比LT高,因为epoll_wait的返回次数更少,也可以理解为同样的事件,只通知一次就完成了,效率也就更高了
  • 只支持非阻塞读写

为什么ET模式只支持非阻塞读写?
因为阻塞式读写可能是一次性把数据读完,但也有可能是循环读,如果是循环读就会造成数据一次不能读完,也就破坏了ET模式工作的平衡性。

select和poll的工作模式是LT,而epoll可以支持LT或者ET,ET模式下读写必须采用非阻塞轮询的方式进行。

将fd设置为非阻塞IO

一个文件描述符默认状态下都是阻塞式IO

fcntl函数原型

#include <unistd.h>
#include <fcntl.h>int fcntl(int fd, int cmd, ... /* arg */ );

传入的cmd不同,后面追加的参数也不同
fcntl函数有五种功能

  • 复制一个现有的描述符(cmd=F_DUPFD)
  • 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD)
  • 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL)
  • 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN)
  • 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)

此处用第三种功能就可以将fd设置为非阻塞式IO

实现SetNonBlock

void SetNoBlock(int fd) {
int fl = fcntl(fd, F_GETFL);
if (fl < 0) return;
elsefcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
  • 使用F_GETFL将该fd的属性提取出来(是一个位图)
  • 然后再使用F_SETFL将fd中的属性追加O_NONBLOCK

对比LT和ET

  • LT模式为默认行为,ET模式能减少响应次数,这也就倒逼程序员必须一次响应就绪就将数据取完
  • 但如果在LT模式下也能做到响应一次就绪就将数据取完,二者的性能其实是一样的

实现简易版Epoll服务器(LT/ET)

写之前先封装一下socket

#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>static const int gbacklog = 32;
const int defaultfd = -1;class Sock
{
public:Sock() : _sock(defaultfd){}void Socket(){_sock = socket(AF_INET, SOCK_STREAM, 0);if (_sock < 0){exit(-1);}// 设置地址是复用的int opt = 1;setsockopt(_sock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));}void Bind(const uint16_t &port){struct sockaddr_in local;memset(&local, 0, sizeof(local));local.sin_family = AF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;if (bind(_sock, (struct sockaddr *)&local, sizeof(local)) < 0){exit(-1);}}void Listen(){if (listen(_sock, gbacklog) < 0){exit(-1);}}int Accept(std::string *clientip, uint16_t *clientport){struct sockaddr_in temp;socklen_t len = sizeof(temp);int sock = accept(_sock, (struct sockaddr *)&temp, &len);if (sock < 0){}else{*clientip = inet_ntoa(temp.sin_addr);*clientport = ntohs(temp.sin_port);}return sock;}int Connect(const std::string &serverip, const uint16_t &serverport){struct sockaddr_in server;memset(&server, 0, sizeof(server));server.sin_family = AF_INET;server.sin_port = htons(serverport);server.sin_addr.s_addr = inet_addr(serverip.c_str());return connect(_sock, (struct sockaddr *)&server, sizeof(server));}int Fd(){return _sock;}void Close(){if (_sock != defaultfd)close(_sock);}~Sock(){}private:int _sock;
};

LT模式

///
// 这里只考虑读的情况,并做本地测试
///
#include <iostream>
#include <sys/epoll.h>
#include <functional>
#include <memory>
#include "Sock.hpp"static const int defaultepfd = -1;
static const int gsize = 128;
using func_t = std::function<std::string(std::string)>;class Epoller
{
public:Epoller() : epfd_(defaultepfd){}void Create(){epfd_ = epoll_create(gsize);if (epfd_ < 0){exit(-1);}}// 用户 -> 内核bool AddModEvent(int fd, uint32_t events, int op) //{struct epoll_event ev;ev.events = events;ev.data.fd = fd; // 用户数据, epoll底层不对该数据做任何修改,就是为了给未来就绪返回的!int n = epoll_ctl(epfd_, op, fd, &ev);if (n < 0){return false;}return true;}bool DelEvent(int fd){// epoll在操作fd的时候,有一个要求,fd必须合法!return epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr) == 0;}int Wait(struct epoll_event *revs, int num, int timeout){//return epoll_wait(epfd_, revs, num, timeout);}int Fd(){return epfd_;}void Close(){if (epfd_ != defaultepfd)close(epfd_);}~Epoller(){}private:int epfd_;
};class EpollServer
{const static int gnum = 64;const static int gport = 8888;public:EpollServer(func_t func, uint16_t port = gport) : func_(func), port_(port) {}void InitServer(){// 创建、绑定、监听listensock_.Socket();listensock_.Bind(gport);listensock_.Listen();epoller_.Create(); // 创建epoll// 将listensock添加到epoll中负责监听新连接epoller_.AddModEvent(listensock_.Fd(), EPOLLIN, EPOLL_CTL_ADD);}void Start(){int timeout = -1;while (true){int n = epoller_.Wait(revs_, gnum, timeout); // 等待事件就绪switch (n){case 0:std::cout << "timeout..." << std::endl;break;case -1:std::cerr << "epoll_wait failed" << std::endl;break;default:HandlerEvent(n);break;}}}void HandlerEvent(int num){for (int i = 0; i < num; ++i){int fd = revs_[i].data.fd;uint32_t events = revs_[i].events;if (events & EPOLLIN) // 读{if (fd == listensock_.Fd()) // 如果是监听成功{std::string clientip;uint16_t clientport;int sock = listensock_.Accept(&clientip, &clientport); // 获取连接if (sock < 0){// 获取失败则继续获取continue;}else{printf("%s:%d 已经连上了服务器了", clientip.c_str(), clientport);fflush(stdout);// 获取成功则将fd添加到epoll中epoller_.AddModEvent(sock, EPOLLIN, EPOLL_CTL_ADD);}}else // 其他读事件{char request[1024];ssize_t s = recv(fd, request, sizeof(request) - 1, 0);if (s > 0) // 读取成功{request[s - 1] = 0; // \r\nrequest[s - 2] = 0; // \r\nstd::string response = func_(request); // 执行业务处理,以telnet回显为例send(fd, response.c_str(), response.size(), 0);}else // 读取异常{// 在处理异常的时候,先从epoll中移除,然后再关闭epoller_.DelEvent(fd);close(fd);}}}}}~EpollServer(){// 关闭fdlistensock_.Close();epoller_.Close();}private:uint16_t port_;Sock listensock_;Epoller epoller_;func_t func_;struct epoll_event revs_[gnum];
};std::string echoServer(std::string r)
{std::string resp = r;resp += "[echo]\r\n";return resp;
}int main()
{std::unique_ptr<EpollServer> svr(new EpollServer(echoServer));svr->InitServer();svr->Start();return 0;
}

ET模式

ET模式与LT模式类似,不过要在添加fd之前,增加非阻塞属性以及将epoll改为ET模式

#include <iostream>
#include <sys/epoll.h>
#include <functional>
#include <fcntl.h>
#include <memory>
#include "Sock.hpp"static const int defaultepfd = -1;
static const int gsize = 128;
using func_t = std::function<std::string(std::string)>;// 设置非阻塞IO
void SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);if (fl < 0){std::cerr << "error string: " << strerror(errno)<< "error code:" << errno << std::endl;return;}fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}class Epoller
{
public:Epoller() : epfd_(defaultepfd){}void Create(){epfd_ = epoll_create(gsize);if (epfd_ < 0){exit(-1);}}// 用户 -> 内核bool AddModEvent(int fd, uint32_t events, int op) //{struct epoll_event ev;ev.events = events;ev.data.fd = fd; // 用户数据, epoll底层不对该数据做任何修改,就是为了给未来就绪返回的!int n = epoll_ctl(epfd_, op, fd, &ev);if (n < 0){return false;}return true;}bool DelEvent(int fd){// epoll在操作fd的时候,有一个要求,fd必须合法!return epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr) == 0;}int Wait(struct epoll_event *revs, int num, int timeout){//return epoll_wait(epfd_, revs, num, timeout);}int Fd(){return epfd_;}void Close(){if (epfd_ != defaultepfd)close(epfd_);}~Epoller(){}private:int epfd_;
};class EpollServer
{const static int gnum = 64;const static int gport = 8888;public:EpollServer(func_t func, uint16_t port = gport) : func_(func), port_(port) {}void InitServer(){// 创建、绑定、监听listensock_.Socket();listensock_.Bind(gport);listensock_.Listen();epoller_.Create(); // 创建epoll// 将listensock添加到epoll中负责监听新连接// 这里没有把listensock设置为非阻塞,稍微麻烦一点,此处暂不实现epoller_.AddModEvent(listensock_.Fd(), EPOLLIN, EPOLL_CTL_ADD);}void Start(){int timeout = -1;while (true){int n = epoller_.Wait(revs_, gnum, timeout); // 等待事件就绪switch (n){case 0:std::cout << "timeout..." << std::endl;break;case -1:std::cerr << "epoll_wait failed" << std::endl;break;default:HandlerEvent(n);break;}}}void HandlerEvent(int num){for (int i = 0; i < num; ++i){int fd = revs_[i].data.fd;uint32_t events = revs_[i].events;if (events & EPOLLIN) // 读{if (fd == listensock_.Fd()) // 如果是监听成功{std::string clientip;uint16_t clientport;int sock = listensock_.Accept(&clientip, &clientport); // 获取连接if (sock < 0){// 获取失败则继续获取continue;}else{printf("%s:%d 已经连上了服务器了", clientip.c_str(), clientport);fflush(stdout);SetNonBlock(fd); // 将fd设置为非阻塞// 获取成功则将fd添加到epoll中并设置为ET模式epoller_.AddModEvent(sock, EPOLLIN | EPOLLET, EPOLL_CTL_ADD);}}else // 其他读事件{// 这里写的有一点不严谨do{char request[2048];ssize_t s = recv(fd, request, sizeof(request) - 1, 0);if (s > 0) // 读取成功{request[s - 1] = 0; // \r\nrequest[s - 2] = 0; // \r\nstd::string response = func_(request); // 执行业务处理,以telnet回显为例send(fd, response.c_str(), response.size(), 0);}else if (s == 0){// 对方读端关闭,因此直接breakbreak;}else{if (errno == EAGAIN || errno == EWOULDBLOCK)// EAGAIN和EWOULDBLOCK表示读缓冲区已空,需要重新尝试读取continue;else if (errno == EINTR)// EINTR表示读取被中断,也需要重新尝试读取continue;elsebreak;}} while (events & EPOLLET);}}}}~EpollServer(){// 关闭fdlistensock_.Close();epoller_.Close();}private:uint16_t port_;Sock listensock_;Epoller epoller_;func_t func_;struct epoll_event revs_[gnum];
};std::string echoServer(std::string r)
{std::string resp = r;resp += "[echo]\r\n";return resp;
}int main()
{std::unique_ptr<EpollServer> svr(new EpollServer(echoServer));svr->InitServer();svr->Start();return 0;
}

更多推荐

【IO多路转接】Epoll

本文发布于:2024-03-23 22:43:04,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1743700.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:多路   IO   Epoll

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!