Netty Pipeline源码分析(1)

编程入门 行业动态 更新时间:2024-10-11 19:15:03

Netty Pipeline<a href=https://www.elefans.com/category/jswz/34/1770099.html style=源码分析(1)"/>

Netty Pipeline源码分析(1)

原文链接:/posts/netty-pipeline-source-analyse-1.html

前面,我们分析了Netty EventLoop的 创建 与 启动 原理,接下里我们来分析Netty中另外两个重要组件—— ChannelHandlerPipeline。Netty中I/O事件的传播机制均由它负责,下面我们来看看它是如何实现的。

Netty版本:4.1.30

我们前面在讲 Channel创建 时,在AbstractChannel的构造函数中, 一笔带过地提到了Pipeline,现在我们来深入分析一下它的原理。

概述

Netty channel lifecycle

前面,我们在分析 Netty channel 源码时,分析了Channel的创建、初始化、注册、绑定过程。在Netty中,channel的生命周期如下所示:

  • ChannelRegistered:Channel注册到了EventLoop上
  • ChannelActive:Channel激活,连接到了远程某一个节点上,可以收发数据了
  • ChannelInactive:断开连接
  • ChannelUnregistered:Channel从EventLoop上取消注册

Netty channelHandler

Channel 每一次状态的变化,都会产生一个事件,调用 ChannelHandler 中对应的方法进行处理,我们看下 ChannelHandler的UML,其中最为重要的两个ChannelHandler:

  • ChannelInboundHandler:处理入站数据以及channel的各种状态变化
  • ChannelOutboundHandler:处理出站数据并允许拦截所有操作

Netty ChannelPipeline

前面我们在分析Channel创建过程时,每一个新创建的Channel都将会被分配一个新的ChannelPipeline。ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler实例链,如图所示:

一个 Channel 包含了一个 ChannelPipeline,ChannelPipeline内部是一个双向的链表结构,内部由一个个的ChannelHandlerContext节点组成,ChannelPipeline有头尾两个固定的节点HeadContext与TailContext。用户自定的ChannelHandler就是由ChannelHandlerContext包装成Pipeline的节点,参与Channel整个生命周期中所触发的入站事件与出站事件以及相应数据流的拦截处理。

根据事件的起源,事件将会被ChannelInboundHandler(入站处理器)或者ChannelOutboundHandler(出站处理器)处理。随后,通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler,如图所示:

Pipeline UML

我们先来看下 ChannelPipeline 以及 ChannelHandlerContext 的类图结构,它们都实现了ChannelInboundInvokerChannelOutboundInvoker接口。

Pipeline初始化

AbstractChannel构造函数如下:

protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();// 创建默认Pipelinepipeline = newChannelPipeline();
}// 创建默认Pipeline
protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}

DefaultChannelPipeline 构造函数如下:

protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);// 设置尾部节点tail = new TailContext(this);// 设置头部节点head = new HeadContext(this);// 将tail与head串联起来head.next = tail;tail.prev = head;
}

我们可以看到Pipeline其实是一个双向链表的结构,刚刚初始化的时候,Pipeline(管道)中只有两个节点,如图:

接下来我们看看组成Pipeline节点的对象—— ChannelHandlerContext。

ChannelHandlerContext

ChannelHandlerContext 实现了AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker接口。Pipeline中的事件传播,都是由ChannelHandlerContext负责,将发生的事件从一个节点传到下一个节点。

ChannelHandlerContext接口
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {// 返回ChannelHandlerContext中绑定的ChannelChannel channel();// 返回专用于执行任务的 EventExecutorEventExecutor executor();// 返回ChannelHandlerContext的唯一名称。该名字将在ChannelHandler被添加到ChannelPipeline时会被用到,从ChannelPipeline中访问注册的ChannelHandler时,也会被用到。String name();// 返回ChannelHandlerContext中绑定的ChannelHandlerChannelHandler handler();// 属于这个ChannelHandlerContext的ChannelHandler从ChannelPipeline移除了,返回trueboolean isRemoved();@OverrideChannelHandlerContext fireChannelRegistered();@OverrideChannelHandlerContext fireChannelUnregistered();@OverrideChannelHandlerContext fireChannelActive();@OverrideChannelHandlerContext fireChannelInactive();@OverrideChannelHandlerContext fireExceptionCaught(Throwable cause);@OverrideChannelHandlerContext fireUserEventTriggered(Object evt);@OverrideChannelHandlerContext fireChannelRead(Object msg);@OverrideChannelHandlerContext fireChannelReadComplete();@OverrideChannelHandlerContext fireChannelWritabilityChanged();@OverrideChannelHandlerContext read();@OverrideChannelHandlerContext flush();// 返回分配的ChannelPipelineChannelPipeline pipeline();// 返回用于分配ByteBuf的ByteBufAllocatorByteBufAllocator alloc();}
AttributeMap接口

实现 AttributeMap 接口,表示ChannelHandlerContext节点可以存储自定义的属性。

// 属性Map接口
public interface AttributeMap {// 通过Key获取属性<T> Attribute<T> attr(AttributeKey<T> key);// 判断属性是否存在<T> boolean hasAttr(AttributeKey<T> key);
}
ChannelInboundInvoker接口

实现ChannelInboundInvoker接口,表示节点可以用于传播入站相关的事件。

public interface ChannelInboundInvoker {// 当Channel注册到EventLoop上时// 调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法 ChannelInboundInvoker fireChannelRegistered();// 当Channel从EventLoop上取消注册// 调用ChannelPipeline中下一个ChannelInboundHandler的channelUnregistered(ChannelHandlerContext)方法 ChannelInboundInvoker fireChannelUnregistered();// 当Channel处理激活状态,意味着连接已经建立// 调用ChannelPipeline中下一个ChannelInboundHandler的channelActive(ChannelHandlerContext)方法 ChannelInboundInvoker fireChannelActive();// 当Channel处理失效状态,意味着连接已经断开// 调用ChannelPipeline中下一个ChannelInboundHandler的channelInactive(ChannelHandlerContext)方法 ChannelInboundInvoker fireChannelInactive();// 在pipeline中某个一个入站(inbound)操作出现了异常// 调用ChannelPipeline中下一个ChannelInboundHandler的exceptionCaught(ChannelHandlerContext)方法 ChannelInboundInvoker fireExceptionCaught(Throwable cause);// 收到用户自定义的事件// 调用ChannelPipeline中下一个ChannelInboundHandler的userEventTriggered(ChannelHandlerContext)方法ChannelInboundInvoker fireUserEventTriggered(Object event);// Channel接收到了消息// 调用ChannelPipeline中下一个ChannelInboundHandler的channelRead(ChannelHandlerContext)方法ChannelInboundInvoker fireChannelRead(Object msg);// 调用ChannelPipeline中下一个ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法ChannelInboundInvoker fireChannelReadComplete();// 调用ChannelPipeline中下一个ChannelInboundHandler的channelWritabilityChanged(ChannelHandlerContext)方法ChannelInboundInvoker fireChannelWritabilityChanged();
}
ChannelOutboundInvoker接口

实现ChannelOutboundInvoker接口,意味着节点可以用来处理出站相关的事件。

public interface ChannelOutboundInvoker {// 将Channel绑定到一个本地地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandler的bind(ChannelHandlerContext, Socket- Address, ChannelPromise)方法ChannelFuture bind(SocketAddress localAddress);ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);// 将Channel连接到一个远程地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandler的connect(ChannelHandlerContext, Socket- Address, ChannelPromise)方法ChannelFuture connect(SocketAddress remoteAddress);ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);// 将Channel断开连接。这将调用ChannelPipeline中的下一个ChannelOutbound- Handler的disconnect(ChannelHandlerContext, Channel Promise)方法ChannelFuture disconnect();ChannelFuture disconnect(ChannelPromise promise);// 将Channel关闭。这将调用ChannelPipeline中的下一个ChannelOutbound- Handler的close(ChannelHandlerContext, ChannelPromise)方法ChannelFuture close();ChannelFuture close(ChannelPromise promise);// 将Channel从它先前所分配的EventExecutor(即EventLoop)中注销。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的deregister (ChannelHandlerContext, ChannelPromise)方法ChannelFuture deregister();ChannelFuture deregister(ChannelPromise promise);// 请求从Channel中读取更多的数据。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的read(ChannelHandlerContext)方法ChannelOutboundInvoker read();// 将消息写入Channel。这将调用ChannelPipeline中的下一个Channel- OutboundHandler的write(ChannelHandlerContext, Object msg, Channel- Promise)方法。注意:这并不会将消息写入底层的Socket,而只会将它放入队列中。要将它写入Socket,需要调用flush()或者writeAndFlush()方法 ChannelFuture write(Object msg);ChannelFuture write(Object msg, ChannelPromise promise);// 冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个Channel- OutboundHandler的flush(ChannelHandlerContext)方法ChannelOutboundInvoker flush();// 这是一个先调用write()方法再接着调用flush()方法的便利方法ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);ChannelFuture writeAndFlush(Object msg);ChannelPromise newPromise();ChannelProgressivePromise newProgressivePromise();ChannelFuture newSucceededFuture();ChannelFuture newFailedFuture(Throwable cause);ChannelPromise voidPromise();
}

TailContext & HeadContext

接下来,我们看看Pipeline中的头部与尾部节点。

TailContext节点

TailContext是尾部节点,inbound类型,主要处理Pipeline中数据流的收尾工作。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {// 调用AbstractChannelHandlerContext构造器// TailContext是一个inbound(入站)节点super(pipeline, null, TAIL_NAME, true, false);// 设置添加完成setAddComplete();}// 返回Handler,就是它自身@Overridepublic ChannelHandler handler() {return this;}...@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {onUnhandledInboundException(cause);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {onUnhandledInboundMessage(msg);}...
}// 如果pipeline中有异常没做处理,最终会由TailContext打赢一个警告日志
protected void onUnhandledInboundException(Throwable cause) {try {logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +"It usually means the last handler in the pipeline did not handle the exception.",cause);} finally {// 释放对象ReferenceCountUtil.release(cause);}
}// 如果pipeline中有read消息没有处理,最终会由TailContext打赢一个警告日志
protected void onUnhandledInboundMessage(Object msg) {try {logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. " +"Please check your pipeline configuration.", msg);} finally {ReferenceCountUtil.release(msg);}
}// 设置 ChannelHandlerContext 状态为添加完成,状态=2
final void setAddComplete() {for (;;) {int oldState = handlerState;if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATERpareAndSet(this, oldState, ADD_COMPLETE)) {return;}}
}
AbstractChannelHandlerContext

AbstractChannelHandlerContext 是 ChannelHandlerContext 的抽象实现:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMapimplements ChannelHandlerContext, ResourceLeakHint {...// 下一个节点volatile AbstractChannelHandlerContext next;// 上一个节点volatile AbstractChannelHandlerContext prev;// 是否为inBound类型private final boolean inbound;// 是否为outbound类型private final boolean outbound;// 绑定的默认pipelineprivate final DefaultChannelPipeline pipeline;// 节点名private final String name;private final boolean ordered;// Will be set to null if no child executor should be used, otherwise it will be set to the// child executor.final EventExecutor executor;...AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {// 设置HandlerContext名称this.name = ObjectUtil.checkNotNull(name, "name");// 绑定pipelinethis.pipeline = pipeline;// 绑定executor(这里为null)this.executor = executor;// 如果节点为inbound类型就设置为truethis.inbound = inbound;// 如果节点为outbound类型就设置为truethis.outbound = outbound;// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered = executor == null || executor instanceof OrderedEventExecutor;}...}
DefaultChannelHandlerContext
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {private final ChannelHandler handler;DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {// 调用 AbstractChannelHandlerContext 构造函数super(pipeline, executor, name, isInbound(handler), isOutbound(handler));if (handler == null) {throw new NullPointerException("handler");}this.handler = handler;}@Overridepublic ChannelHandler handler() {return handler;}// 是否为inBound类型private static boolean isInbound(ChannelHandler handler) {return handler instanceof ChannelInboundHandler;}// 是否为outBound类型private static boolean isOutbound(ChannelHandler handler) {return handler instanceof ChannelOutboundHandler;}
}
HeadContext

HeadContext是头部节点,outbound类型,用于传播事件和进行一些底层socket操作。

final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {// 调用AbstractChannelHandlerContext构造器// HeadContext是一个outbound(出站)节点super(pipeline, null, HEAD_NAME, false, true);// 设置Unsafe对象unsafe = pipeline.channel().unsafe();// 设置添加完成setAddComplete();}// 返回ChannelHandler,就只它自身@Overridepublic ChannelHandler handler() {return this;}@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {// 调用 unsafe 进行bind操作    unsafe.bind(localAddress, promise);}@Overridepublic void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) throws Exception {// 调用 unsafe 进行 connect 操作unsafe.connect(remoteAddress, localAddress, promise);}@Overridepublic void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {// 调用 unsafe 进行 disconnect 操作unsafe.disconnect(promise);}@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {// 调用 unsafe 进行 close 操作unsafe.close(promise);}@Overridepublic void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {// 调用 unsafe 进行 deregister 操作unsafe.deregister(promise);}@Overridepublic void read(ChannelHandlerContext ctx) {// 调用 unsafe 进行 read 操作unsafe.beginRead();}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {// 调用 unsafe 进行 write 操作unsafe.write(msg, promise);}@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {// 调用 unsafe 进行 flush 操作unsafe.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 传播ExceptionCaught事件ctx.fireExceptionCaught(cause);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {invokeHandlerAddedIfNeeded();// 传播channelRegistered事件ctx.fireChannelRegistered();}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {// 传播channelUnregistered事件ctx.fireChannelUnregistered();// Remove all handlers sequentially if channel is closed and unregistered.if (!channel.isOpen()) {destroy();}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 传播 channelActive 事件ctx.fireChannelActive();// 在 /posts/netty-channel-source-analyse.html 中分析过了// 主要是在channel激活之后,向底层的selector注册一个SelectionKey.OP_ACCEPT监听事件// 这样channel在连接之后,就可以监听到一个read事件readIfIsAutoRead();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 传播 channelInactive 事件ctx.fireChannelInactive();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 传播 channelRead 事件ctx.fireChannelRead(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 传播 channelReadComplete 事件ctx.fireChannelReadComplete();// readIfIsAutoRead();}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 传播 userEventTriggered 事件ctx.fireUserEventTriggered(evt);}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {// 传播 channelWritabilityChanged 事件ctx.fireChannelWritabilityChanged();}
}

Pipeline 节点添加

上面我们分析了Pipeline的基本结构,接下来我们看看Pipeline添加节点(也就是Handler处理器)的过程。该过程主要分为三步:

  • 判断是否重复添加
  • 创建节点并添加至链表
  • 回调添加完成事件

以这段常见的代码为例:

ServerBootstrap b = new ServerBootstrap();
b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE).childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 添加 serverHandlerch.pipeline().addLast(serverHandler);}});
ChannelFuture f = b.bind().sync();

我们从 ChannelPipeline.addLast() 方法进去:

public class DefaultChannelPipeline implements ChannelPipeline {...@Overridepublic final ChannelPipeline addLast(ChannelHandler... handlers) {return addLast(null, handlers);}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {if (handlers == null) {throw new NullPointerException("handlers");}// 循环处理for (ChannelHandler h: handlers) {if (h == null) {break;}addLast(executor, null, h);}return this;}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {// 检查是否重复checkMultiplicity(handler);// 创建新节点newCtx = newContext(group, filterName(name, handler), handler);// 添加新节点addLast0(newCtx);// 如果 registered 为 false,则表示这个channel还未注册到EventLoop上.// 在这种情况下,我们添加一个Task到PendingHandlerCallback中,// 等到这个channel注册成功之后,将会调用立即调用 ChannelHandler.handlerAdded(...) 方法,已达到channel添加的目的if (!registered) {// 设置为待添加状态newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}// 获取executorEventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {// 设置为待添加状态newCtx.setAddPending();executor.execute(new Runnable() {@Overridepublic void run() {// 回调添加完成事件callHandlerAdded0(newCtx);}});return this;}}// 回调添加完成事件callHandlerAdded0(newCtx);return this;}// 检查是否重复private static void checkMultiplicity(ChannelHandler handler) {// handler是否为ChannelHandlerAdapter类型,不是则不做处理if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;// 判断handler是否添加了Sharable注解 && 是否添加过了if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}// 创建新的节点private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {// 调用DefaultChannelHandlerContext的构造函数return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}// 在tail节点之前添加新节点private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}// 回调ChannelHandler中的handlerAdded方法private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {// 我们必须在handlerAdded方法之前调用setAddComplete方法。否则的话,一旦handlerAdded方法产生了任何pipeline事件,由于状态的缘故,ctx.handler()将会丢失这些事件的处理。// 设置新节点的状态为添加完成状态ctx.setAddComplete();// 调用handlerAdded接口ctx.handler().handlerAdded(ctx);} catch (Throwable t) {...// 如果添加失败,则删除新节点    remove0(ctx);...}}...}

我们来看下setAddComplete()方法:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMapimplements ChannelHandlerContext, ResourceLeakHint {...// 通过自旋操作,设置状态为ADD_COMPLETEfinal void setAddComplete() {for (;;) {int oldState = handlerState;// Ensure we never update when the handlerState is REMOVE_COMPLETE already.// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not// exposing ordering guarantees.if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATERpareAndSet(this, oldState, ADD_COMPLETE)) {return;}}}...// 设置为 ADD_PENDING 状态final void setAddPending() {boolean updated = HANDLER_STATE_UPDATERpareAndSet(this, INIT, ADD_PENDING);assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().}...
}

回调用户自定义Handler中的handlerAdded方法:

@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {...@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.printf("ServerHandler added ....");}...}    

ChannelInitializer

关于回调ChannelHandler中的handlerAdded()方法,最常见的一个场景就是,使用 ChannelInitializer 来添加我们自定义的ChannelHandler。ChannelInitializer被添加完成之后,会回调到它的 initChannel 方法。

接下来,我们看看 ChannelInitializer 这个类,它是一个特殊的ChannelInboundHandler,它提供了一种在Channel注册到EventLoop后初始化Channel的简便方法。

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();/*** 当 ch 注册成功之后,该方法就会被调用,该方法结束返回之后,此ChannelInitializer实例将会从Channel所绑定的ChannelPipeline中移除* * @param ch 所注册的Channel* */protected abstract void initChannel(C ch) throws Exception;...// ChannelInitializer 添加成功后,会回调到handlerAdded()接口@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {// This should always be true with our current DefaultChannelPipeline implementation.// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers// will be added in the expected order.initChannel(ctx);}}  @SuppressWarnings("unchecked")private boolean initChannel(ChannelHandlerContext ctx) throws Exception {// 标记ctx为true,且之前没有标记过。防止重复执行if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {try {// 调用initChannel方法initChannel((C) ctx.channel());} catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).// We do so to prevent multiple calls to initChannel(...).exceptionCaught(ctx, cause);} finally {// 最终会删除 ChannelInitializer 实例remove(ctx);}return true;}return false;}// 删除 ChannelInitializer 实例private void remove(ChannelHandlerContext ctx) {try {// 获取 PipelineChannelPipeline pipeline = ctx.pipeline();// 从 Pipeline 中返回 ChannelInitializer 实例if (pipeline.context(this) != null) {// 删除 ChannelInitializer 实例// 删除逻辑请看下一小节pipeline.remove(this);}} finally {initMap.remove(ctx);}}}

遍历 ChannelHandlerContext 节点查询出ChannelHandler实例

public class DefaultChannelPipeline implements ChannelPipeline {...// 通过handler获取ChannelHandlerContext@Overridepublic final ChannelHandlerContext context(ChannelHandler handler) {if (handler == null) {throw new NullPointerException("handler");}AbstractChannelHandlerContext ctx = head.next;for (;;) {if (ctx == null) {return null;}if (ctx.handler() == handler) {return ctx;}ctx = ctx.next;}}...}    

Pipeline中除了addLast方法外, 还有addFirst、addBefore、addAfter等方法,逻辑类似,可以自行研究学习。

Pipeline 节点删除

上面,我们讲了Pipeline节点的添加,这小结我们看看Pipeline节点的删除功能。

netty 有个最大的特性之一就是Handler可插拔,做到动态编织pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器。

下面是权限认证Handler最简单的实现,第一个数据包传来的是认证信息,如果校验通过,就删除此Handler,否则,直接关闭连接

// 鉴权Handler
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {...@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {if (verify(authDataPacket)) {ctx.pipeline().remove(this);} else {ctx.close();}}private boolean verify(ByteBuf byteBuf) {//...}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("AuthHandler has been removed ! ");}
}

我们来看看 DefaultChannelPipeline 中的 remove 方法:

public class DefaultChannelPipeline implements ChannelPipeline {...// 从Pipeline中删除ChannelHandler@Overridepublic final ChannelPipeline remove(ChannelHandler handler) {remove(getContextOrDie(handler));return this;}...// 获取 ChannelHandler ,获取不到就抛出异常private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);if (ctx == null) {throw new NoSuchElementException(handler.getClass().getName());} else {return ctx;}}...// 删除private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {// ctx不能为heand与tailassert ctx != head && ctx != tail;synchronized (this) {// 从pipeline中删除ChannelHandlerContext节点remove0(ctx);// 如果为false,则表明channel还没有注册到eventloop上// 在删除这种场景下,我们先添加一个Task,一旦channel注册成功就会调用这个Task,这个Task就会立即调用ChannelHandler.handlerRemoved(...)方法,来从pipeline中删除context。if (!registered) {callHandlerCallbackLater(ctx, false);return ctx;}EventExecutor executor = ctx.executor();if (!executor.inEventLoop()) {executor.execute(new Runnable() {@Overridepublic void run() {// 回调 handlerRemoved 方法callHandlerRemoved0(ctx);}});return ctx;}}// 回调 handlerRemoved 方法callHandlerRemoved0(ctx);return ctx;}...// 删除节点 ChannelHandlerContextprivate static void remove0(AbstractChannelHandlerContext ctx) {AbstractChannelHandlerContext prev = ctx.prev;AbstractChannelHandlerContext next = ctx.next;prev.next = next;next.prev = prev;}...private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {// Notify the complete removal.try {try {// 回调 handlerRemoved 方法// 也就是我们前面例子 AuthHandler 中的 handlerRemoved() 方法ctx.handler().handlerRemoved(ctx);} finally {// 设置为ctx 状态为 REMOVE_COMPLETE ctx.setRemoved();}} catch (Throwable t) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));}}...}    

好了, 删除的逻辑就分析到这里了。

小结

这一讲我们分析了Pipeline的创建过程,了解Pipeline中的链表结构以及每个节点的数据结构。还分析了Pipeline是如何添加节点的,又是如何删除节点的。接下来 ,我们会分析Pipeline如何进行事件传播的。

参考资料

  • Java读源码之Netty深入剖析
  • 《Netty in action》

转载于:

更多推荐

Netty Pipeline源码分析(1)

本文发布于:2024-02-17 17:35:11,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1694858.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:源码   Netty   Pipeline

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!