Netty源码解析(五)信息交互

编程入门 行业动态 更新时间:2024-10-24 18:23:53

Netty<a href=https://www.elefans.com/category/jswz/34/1770099.html style=源码解析(五)信息交互"/>

Netty源码解析(五)信息交互

前几篇文章主要讲解了netty从服务端启动到接收到客户端连接并为客户端添加感兴趣事件的流程,接下来主要讲解信息的写出流程,至于解码流程,知识比较单一,以后会单独出一篇讲解
我们都知道,我们写入消息用ctx.writeAndFlush()方法,或者用ctx.channel().writeAndFlush()方法,但是两者也是有差别的
加入我们的pipeline结构如下

如果我们在TestInHandler的channelRead()方法中通过ctx.writeAndFlush()写入消息,那么会从TestInHandler直接走到outboundhandler1,然后到head,然后发出,如果使用ctx.channel().writeAndFlush(),那么消息会从tail出发,先走outboundhander2->outboundhandler1->head,写出到服务端或者客户端。如果outboundhandler2是一个编码器,则必须要第二种方法写出。
我们主要研究head中将信息写出的过程,直接查看ctx.writeAndFlush()

/**** @param msg 要发送的消息* @param flush true* @param promise 保存通道信息*/private void write(Object msg, boolean flush, ChannelPromise promise) {ObjectUtil.checkNotNull(msg, "msg");try {if (isNotValidPromise(promise, true)) {ReferenceCountUtil.release(msg);// cancelledreturn;}} catch (RuntimeException e) {ReferenceCountUtil.release(msg);throw e;}//直接从当前handler开始,找下一个outboundhandlerfinal AbstractChannelHandlerContext next = findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m = pipeline.touch(msg, next);EventExecutor executor = next.executor();//已经创建好reactor线程if (executor.inEventLoop()) {//flush传的是trueif (flush) {//next是下一个outboundhandler,进入此方法next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {final AbstractWriteTask task;if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise);}  else {task = WriteTask.newInstance(next, m, promise);}if (!safeExecute(executor, task, promise, m)) {// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes// and put it back in the Recycler for re-use later.//// See .task.cancel();}}}

我们主要研究next.invokeWriteAndFlush方法

 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) {invokeWrite0(msg, promise);invokeFlush0();} else {writeAndFlush(msg, promise);}}

可以看到,此方法分了两个步骤完成,先是write进通道,然后flush传输

private void invokeWrite0(Object msg, ChannelPromise promise) {try {((ChannelOutboundHandler) handler()).write(this, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}}

还是和之前的方法一样,逐个调用。这里我们不再分析数据从每个handler之间的传输,head节点是最后发送数据的节点,所以我们直接查看head节点的write方法

@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise);}

调用unsafe类的write方法

@Overridepublic final void write(Object msg, ChannelPromise promise) {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;//判空方法if (outboundBuffer == null) {// If the outboundBuffer is null we know the channel was closed and so// need to fail the future right away. If it is not null the handling of the rest// will be done in flush0()// See (promise, newClosedChannelException(initialCloseCause));// release message now to prevent resource-leakReferenceCountUtil.release(msg);return;}int size;try {//处理msg,filterOutboundMessage是将堆内内存转换为堆外内存,也就说明最终写出的数据肯定是堆外内存msg = filterOutboundMessage(msg);size = pipeline.estimatorHandle().size(msg);if (size < 0) {size = 0;}} catch (Throwable t) {safeSetFailure(promise, t);ReferenceCountUtil.release(msg);return;}//连接信息outboundBuffer.addMessage(msg, size, promise);}

我们主要看addMessage方法

public void addMessage(Object msg, int size, ChannelPromise promise) {Entry entry = Entry.newInstance(msg, size, total(msg), promise);if (tailEntry == null) {flushedEntry = null;} else {Entry tail = tailEntry;tail.next = entry;}tailEntry = entry;if (unflushedEntry == null) {unflushedEntry = entry;}// increment pending bytes after adding message to the unflushed arrays.// See (entry.pendingSize, false);}

我们可以看到有三个指针,分别是flushedEntry,unflushedEntry,tailEntry,顾名思义,flushedEntry是已经写出的Entry,unflushedEntry是还没有写的Entry,tailEntry指向最后一个entry
当有数据进入时,会先封装成entry,因为还没有处理数据,所以这里tailEntry和unflushedEntry都指向此节点,当有更多节点进入时,unflushedEntry会指向第一个节点,tailEntry指向最后一个节点,如下图

incrementPendingOutboundBytes方法判断是否最大发送字节数,超过上限则先设置后面的数据为不可读
调用到AbstractChannel的flush方法

 @Overridepublic final void flush() {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {return;}outboundBuffer.addFlush();flush0();}

我们查看outboundBuffer.addFlush();

 public void addFlush() {// There is no need to process all entries if there was already a flush before and no new messages// where added in the meantime.//// See  entry = unflushedEntry;if (entry != null) {if (flushedEntry == null) {// there is no flushedEntry yet, so start with the entryflushedEntry = entry;}do {flushed ++;if (!entry.promise.setUncancellable()) {// Was cancelled so make sure we free up memory and notify about the freed bytesint pending = entry.cancel();//遍历节点,减去写出去的节点decrementPendingOutboundBytes(pending, false, true);}entry = entry.next;} while (entry != null);// All flushed so reset unflushedEntryunflushedEntry = null;}}

执行完成后,Entry链的结构如下

然后调用到flush0()方法,会一直调用到AbstractNioByteChannel(NioSocketChannel父类)的doWriteInternal方法

//写出去的方法private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {/*** 判断msg的不同类型,做不同操作*/if (msg instanceof ByteBuf) {ByteBuf buf = (ByteBuf) msg;//判空if (!buf.isReadable()) {in.remove();return 0;}//写方法final int localFlushedAmount = doWriteBytes(buf);if (localFlushedAmount > 0) {in.progress(localFlushedAmount);if (!buf.isReadable()) {in.remove();}return 1;}} else if (msg instanceof FileRegion) {FileRegion region = (FileRegion) msg;if (region.transferred() >= region.count()) {in.remove();return 0;}long localFlushedAmount = doWriteFileRegion(region);if (localFlushedAmount > 0) {in.progress(localFlushedAmount);if (region.transferred() >= region.count()) {in.remove();}return 1;}} else {// Should not reach here.throw new Error();}return WRITE_STATUS_SNDBUF_FULL;}

查看doWriteBytes方法

@Overrideprotected int doWriteBytes(ByteBuf buf) throws Exception {final int expectedWrittenBytes = buf.readableBytes();//写出数据return buf.readBytes(javaChannel(), expectedWrittenBytes);}

再看readBytes

@Overridepublic int readBytes(GatheringByteChannel out, int length) throws IOException {checkReadableBytes(length);int readBytes = getBytes(readerIndex, out, length, true);readerIndex += readBytes;return readBytes;}
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {ensureAccessible();ByteBuffer tmpBuf;if (internal) {tmpBuf = internalNioBuffer();} else {tmpBuf = ByteBuffer.wrap(array);}//通过通道写出return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length));}

可以看到消息已经写出去

我们再来看下in.remove()方法

public boolean remove() {Entry e = flushedEntry;if (e == null) {clearNioBuffers();return false;}Object msg = e.msg;ChannelPromise promise = e.promise;int size = e.pendingSize;//移动指针的方法removeEntry(e);if (!e.cancelled) {// only release message, notify and decrement if it was not canceled before.ReferenceCountUtil.safeRelease(msg);safeSuccess(promise);decrementPendingOutboundBytes(size, false, true);}// recycle the entrye.recycle();return true;}

点到removeentry

private void removeEntry(Entry e) {//节点全都写完后,三个标志都指向空if (-- flushed == 0) {flushedEntry = null;if (e == tailEntry) {tailEntry = null;unflushedEntry = null;}} else {//指向下一个节点flushedEntry = e.next;}}

节点全都写完后,恢复初始状态

更多推荐

Netty源码解析(五)信息交互

本文发布于:2024-03-12 22:54:11,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1732603.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:源码   信息   Netty

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!