Netty源码解析(四)接收客户端

编程入门 行业动态 更新时间:2024-10-24 20:16:21

Netty<a href=https://www.elefans.com/category/jswz/34/1770099.html style=源码解析(四)接收客户端"/>

Netty源码解析(四)接收客户端

上一篇我们学完了NioServerSocketChannel创建,初始化,注册到selector,添加感兴趣事件,相当于完成了Nio的如下几步

//创建一个ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//创建一个SelectorSelector selector = Selector.open();//绑定6666端口serverSocketChannel.bind(new InetSocketAddress(6666));// 设置连接非阻塞serverSocketChannel.configureBlocking(false);//注册到selectorserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

接下来我们学习轮询接收客户端请求事件,即NioEventLoop的run方法

@Overrideprotected void run() {for (;;) {try {try {//hasTasks()  若taskQueue or  tailTasks任务队列中有任务  返回false  没有则返回true
//                  //有任务返回selectnow的返回值   没任务返回-1switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT://首先轮询注册到reactor线程对应的selector上的所有的channel的IO事件//wakenUp 表示是否应该唤醒正在阻塞的select操作,netty在每次进行新的loop之前,都会将wakeUp 被设置成false,标志新的一轮loop的开始select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;//处理jdk的空轮训bugif (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {//2.处理产生网络IO事件的channelprocessSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;//3.处理任务队列runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {//关闭所有通道closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}

当执行完任务队列的任务后,selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())返回-1,进入SELECT,

/*这里主要是处理阻塞情况。netty在阻塞中会额外处理一些事件,不会让reactor线程一直等待,这样也提高了效率*/private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();//当scheduledTaskQueue为空时 selectDeadLineNanos=当前时间加一秒//这里就是定时任务开始执行的时间long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {//1.定时任务截止事时间快到了,中断本次轮询,也就是判断如果定时任务队列有任务开始<=0.5ms,则终止本次轮询long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;//当前的定时任务队列中有任务的截止事件快到了(<=0.5ms),就跳出循环。if (timeoutMillis <= 0) {//如果到目前还没有进行过select操作  调用selectNow()if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.// 2.轮询过程中发现有任务加入,中断本次轮询 netty为了保证任务队列能够及时执行,在进行阻塞select操作之前会判断任务队列是否为空,如果不为空,就执行一次非阻塞select操作,跳出循环//hasTasks() && wakenUppareAndSet(false, true)  如果队列中有任务  则设置wakenUp为true  并返回trueif (hasTasks() && wakenUppareAndSet(false, true)) {//进行一次不阻塞轮询selector.selectNow();//设置次数为1selectCnt = 1;break;}//阻塞式select操作//执行到这一步,说明netty任务队列里面队列为空,并且所有定时任务延迟时间还未到(大于0.5ms),//于是,在这里进行一次阻塞select操作,截止到第一个定时任务的截止时间int selectedKeys = selector.select(timeoutMillis);selectCnt ++;//轮询到IO事件进入if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - 轮询到io事件// - oldWakenUp 参数为true// - 用户主动唤醒// - 任务队列里面有任务// - 第一个定时任务即将要被执行break;}if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();//处理空轮训的bug//现在的时间-select阻塞的时间=>运行之前的时间if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis在没有选择任何内容的情况下运行。selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {//如果selectCnt>=512就重新创建新的selector并替换//创建新的selectorselector = selectRebuildSelector(selectCnt);selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}}

当有客户端接入时,selectedKeys !=0,会跳出循环,从这里就显示出了netty对于效率的追求,不会一直阻塞线程,而是在阻塞的空闲时间完成taskQueue的任务
客户端接入后,我们进入处理IO事件的processSelectedKeys方法

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();// k.isValid()告知此键是否有效。if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {return;}if (eventLoop != this || eventLoop == null) {return;}unsafe.close(unsafe.voidPromise());return;}try {//获取此键的 ready 操作集合。int readyOps = k.readyOps();//处理不同事件,连接事件,读写事件if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See  ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}//第一次肯定是接收事件,进入此方法if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}

当有客户端接入时,一定会走到unsafe.read();方法,进入方法,这里的unsafe是服务端的unsafe,所以走到AbstaractNioMessageChannel

 @Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();//pipelinefinal ChannelPipeline pipeline = pipeline();//分配一块内存final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {//NioServerSocketChannel接收客户端连接的方法int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}//处理多个客户端int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;//在serverSocketChanne的pipeline中将得到的socketChannel向下传播pipeline.fireChannelRead(readBuf.get(i));}//清空列表readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See  (!readPending && !config.isAutoRead()) {removeReadOp();}}}

我们重点关注两个方法,doReadMessages是NioServerSocketChannel接收客户端连接的方法,

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {//javaChannel()  jdk的ServerSocketChannel   接收到的ch为Nio的SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {//对SocketChannel进行封装,将接收到的客户端封装成NioSocketChannel添加到列表中buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}

SocketUtils.accept(javaChannel())方法

public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {try {return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {@Overridepublic SocketChannel run() throws IOException {//接收客户端连接,NIO的accept方法return serverSocketChannel.accept();}});} catch (PrivilegedActionException e) {throw (IOException) e.getCause();}}

new NioSocketChannel方法,和new NioServerSocketChannel方法类似,只是其父类是AbstractNioByteChannel,并且传入的感兴趣事件是READ事件

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);}

会将接收到的SocketChannel封装成NioSocketChannel,加入到缓存中,然后再关注pipeline.fireChannelRead(readBuf.get(i));方法循环处理接收到的SocketChannel,我们之前说到过目前NioServerSocketChannel的pipeline如下结构

我们关注ServerBootstartAcceptor的channelRead()方法,我们要重点分析ServerBootstrapAcceptor类
其构造方法如下

ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {//workerGroup  childGroupthis.childGroup = childGroup;//我们自定义的ChannelInitializerthis.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;enableAutoReadTask = new Runnable() {@Overridepublic void run() {channel.config().setAutoRead(true);}};}

里面保存着我们的配置参数,现在NioSocketChannel中的pipeline如下:

再来看ServerBootstrapAcceptor的channelRead()方法

/*处理socketChannel的方法*/@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {//msg  NioSocketChannelfinal Channel child = (Channel) msg;//将channelInitializer添加到pipeline中child.pipeline().addLast(childHandler);//设置参数setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {//将NioSocketChannel注册到Selector中childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}

其中register方法和server端一样,这里不再详细说明,注册成功后,也会执行handlerAdded,channelRegister,ChannelRegister,channelActive方法,此时的pipeline如下

然后会执行pipeline.fireChannelReadComplete();这个方法也和NioServerSocketChannel添加感兴趣事件方法相同,会将我们在生成NioSocketChannel时添加的READ的事件注册到selector,到这里,整体流程就完成了。
下一篇将会讲对交互消息的处理

更多推荐

Netty源码解析(四)接收客户端

本文发布于:2024-03-12 22:53:47,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1732602.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:源码   客户端   Netty

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!