几个组件介绍】"/>
【Netty 几个组件介绍】
一、eventLoop
EventLoop
EventLoop 本质是一个单线程执行器 (同时维护了一个Selector), 里面有run 方法处理一个或者多个channel 上源源不断的io事件。
它的继承关系如下:
- 继承自j.u.c.ScheduleExecutorService 因此包含了线程池中所有的方法
- 继承自 netty 自己的OrdererEventExecutor
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此EventLoop
- 提供了EventLoopGroup parent() 方法来看自己属于哪个EventLoopGroup
EventLoopGroup
EventLoopGroup 是一组EventLoop, Channel 一般会调用
EventLoopGroup 的register 方法来绑定其中一个 EventLoop, 后续这个Channel 上的 io事件都是由此EventLoop来处理 (保证了io事件处理时的线程安全)
- 继承自netty 自己的EventExecutorGroup
- 实现了Iterable 接口提供遍历 EventLoop的能力
- 另外 next可以获取到下一个 EventLoop
处理普通与定时任务
代码:
package com.xlgponentty.eventLoop;import java.util.concurrent.TimeUnit;import ioty.channel.EventLoopGroup;
import ioty.channel.nio.NioEventLoopGroup;/*** @author wangqingwei* Created on 2022-06-08*/
public class TestEventLoop {public static void main(String[] args) {// 1. 创建事件循环组/*nthreads == 0 ? Math.max(1, SystemPropertyUtil.getInt("ioty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)) : nthreads;*/EventLoopGroup group = new NioEventLoopGroup(2); // io 事件, 普通任务, 定时任务// EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务, 定时任务// 2. 获取下一个事件循环对象// 1 3 相等 因为底层是循环走的System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());System.out.println(group.next());// 3. 指定普通任务// group.next().submit(() -> System.out.println("ok"));// 4. 定时任务// 立即执行, 并且是1s一次group.next().scheduleAtFixedRate(() -> System.out.println("ok"), 0, 1, TimeUnit.SECONDS);}
}
执行结果:
ioty.channel.nio.NioEventLoop@3d8c7aca
ioty.channel.nio.NioEventLoop@5ebec15
ioty.channel.nio.NioEventLoop@3d8c7aca
ioty.channel.nio.NioEventLoop@5ebec15
ok
ok
ok
ok
关闭EventLoopGroup
优雅的关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup到关闭状态从而拒绝新的任务加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的.
处理IO任务
服务器代码
package com.xlgponentty.eventLoop;import static com.xlgponent.nio.TestCommon.NETTY_SERVER_PORT;import java.nio.charset.StandardCharsets;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import ioty.bootstrap.ServerBootstrap;
import ioty.buffer.ByteBuf;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import ioty.channel.ChannelInitializer;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioServerSocketChannel;
import ioty.channel.socket.nio.NioSocketChannel;/*** @author wangqingwei* Created on 2022-06-08*/
public class EventLoopServer {private static final Logger logger = LoggerFactory.getLogger(EventLoopServer.class);public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(StandardCharsets.UTF_8));}});}}).bind(NETTY_SERVER_PORT);}
}
客户端代码
package com.xlgponentty.eventLoop;import static com.xlgponent.nio.TestCommon.LOCAL_HOST;
import static com.xlgponent.nio.TestCommon.NETTY_SERVER_PORT;import java.InetSocketAddress;import ioty.bootstrap.Bootstrap;
import ioty.channel.Channel;
import ioty.channel.ChannelInitializer;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioSocketChannel;
import ioty.handler.codec.string.StringEncoder;/*** @author wangqingwei* Created on 2022-06-08*/
public class EventLoopClient {public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT)).sync().channel();System.out.println(channel);System.out.println("--");channel.writeAndFlush("asdfasf");}
}
分工
Bootstrap 的group方法可以传入两个 EventLoopGroup参数,分别boss处理accept事件,work处理读写
new ServerBootstrap()// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))...
多个客户端分别发送 hello 结果.
nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4
其实可以看到, 一个EventLoop 可以负责多个Channel,且EventLoop 一旦与Channel绑定,则一直负责该Channel的事件.
增加自定义EventLoopGroup
问题:
当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup, 避免同一个NioEventLoop 中的其他Channel在较长时间内都无法得到处理
package com.xlgponentty.eventLoop;import static com.xlgponent.nio.TestCommon.NETTY_SERVER_PORT;import java.nio.charset.StandardCharsets;import ioty.bootstrap.ServerBootstrap;
import ioty.buffer.ByteBuf;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import ioty.channel.ChannelInitializer;
import ioty.channel.DefaultEventLoopGroup;
import ioty.channel.EventLoopGroup;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioServerSocketChannel;
import ioty.channel.socket.nio.NioSocketChannel;/*** @author wangqingwei* Created on 2022-06-09*/
public class MyServer {public static void main(String[] args) {// 增加自定义的非NioEventLoopGroupEventLoopGroup group = new DefaultEventLoopGroup();new ServerBootstrap().group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {// 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName() + ": " + buf.toString(StandardCharsets.UTF_8));// 调用下一个handlerctx.fireChannelRead(msg);}})// 该handler绑定自定义的Group.addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(Thread.currentThread().getName() + ": " + buf.toString(StandardCharsets.UTF_8));}});}}).bind(NETTY_SERVER_PORT);}
}
启动三个客户端发送数据
nioEventLoopGroup-4-1: asdf
defaultEventLoopGroup-2-1: asdf
nioEventLoopGroup-4-2: hel2
defaultEventLoopGroup-2-2: hel2
nioEventLoopGroup-4-1: adf333
defaultEventLoopGroup-2-3: adf333
可以看出,客户端与服务器之间的事件,被nioEventLoopGroup 和 defaultEventLoopGroup分别处理
切换的实现
**不同的EventLoopGroup 切换的实现原理如下: **
由上面的图可以看出,当handler 中绑定的Group不同时,需要切换Group来执行不同的任务
ioty.channel.AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);// 获取下一个EventLoop, excutor 即为EventLoopGroupEventExecutor executor = next.executor();// 如果下一个EventLoop 在当前的 EVentLoopGroup 中if (executor.inEventLoop()) {// 使用当前的 EventLoopGroup 中的 EventLoop 来处理任务next.invokeChannelRead(m);} else {// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行executor.executor(new Runnable(){public void run() {next.invokeChannelRead(m);}});}
}
- 如果两个handler 绑定的是同一个 EventLoopGroup, 就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个handler 的EventLoopGroup 来调用.
二、Channel
Channel 的常用方法
- close() 可以用来关闭Channel
- closeFuture() 用来处理 Channel 的关闭
- sync 方法作用是同步等待channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipline() 方法用于添加处理器
- write() 方法将数据写入
- 因为缓冲机制,数据被写入到channel 中以后,不会立即被发送
- 只有当缓冲满了 或者 调用了flush() 方法后,才会将数据通过channel 发送出去
- writeAndFlush() 方法将数据写入并立即发送 (刷出)
ChannelFuture
获取channel
同步异步获取建立连接后的 channel 和 发送数据.
package com.xlgponentty.eventLoop;import static com.xlgponent.nio.TestCommon.LOCAL_HOST;
import static com.xlgponent.nio.TestCommon.NETTY_SERVER_PORT;import java.InetSocketAddress;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import ioty.bootstrap.Bootstrap;
import ioty.channel.Channel;
import ioty.channel.ChannelFuture;
import ioty.channel.ChannelFutureListener;
import ioty.channel.ChannelInitializer;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioSocketChannel;
import ioty.handler.codec.string.StringEncoder;/*** @author wangqingwei* Created on 2022-06-08*/
public class EventLoopClient {private static final Logger logger = LoggerFactory.getLogger(EventLoopClient.class);public static void main(String[] args) throws InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}})// 1. 建立链接// 异步非阻塞, main发起了调用, 真正执行的是 nio 线程.NioEventLoopGroup.connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT));// 2.1 使用sync同步处理结果// 阻塞住当前线程, 直到nio线程连接完毕. 如果去掉sync此时, 由于连接未建立完毕, main执行, channel无法给服务器写数据
// channelFuture.sync();
// Channel channel = channelFuture.channel();
// System.out.println(channel);
// System.out.println("--");
// channel.writeAndFlush("hello world");// 2.2 异步addListener(回调对象)channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {final Channel channel = future.channel();logger.debug("{}", channel);channel.writeAndFlush("hello world !!");}});}
}
正确关闭
package com.xlgponentty.eventLoop;import static com.xlgponent.nio.TestCommon.LOCAL_HOST;
import static com.xlgponent.nio.TestCommon.NETTY_SERVER_PORT;import java.InetSocketAddress;
import java.util.Scanner;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import ioty.bootstrap.Bootstrap;
import ioty.channel.Channel;
import ioty.channel.ChannelFuture;
import ioty.channel.ChannelFutureListener;
import ioty.channel.ChannelInitializer;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioSocketChannel;
import ioty.handler.codec.string.StringEncoder;
import ioty.handler.logging.LogLevel;
import ioty.handler.logging.LoggingHandler;/*** @author wangqingwei* Created on 2022-06-08*/
public class CloseFutureClient {private static final Logger logger = LoggerFactory.getLogger(CloseFutureClient.class);public static void main(String[] args) throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new StringEncoder());}})// 1. 建立链接// 异步非阻塞, main发起了调用, 真正执行的是 nio 线程.NioEventLoopGroup.connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT));Channel channel = channelFuture.sync().channel();new Thread(() -> {Scanner scanner = new Scanner(System.in);while (true) {String line = scanner.nextLine();if ("q".equals(line)) {// 还是异步的, 关闭交给另外一个线程处理channel.close();// thread1 输出
// logger.debug("关闭之后的操作!!1");break;}channel.writeAndFlush(line);}}, "thread1").start();
// logger.debug("main end ");// 获取closeFuture 关闭后的处理 1. 同步 2.异步ChannelFuture closeFuture = channel.closeFuture();
// logger.debug("waiting close....");
// closeFuture.sync();
// logger.debug("处理关闭之后的操作...");closeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {logger.debug("处理关闭之后的操作...");// 优雅的关闭NioEventLoopGroup 里面还有其他线程group.shutdownGracefully();}});}
}
提升的是 吞吐量: 单位时间内处理的请求个数
三、Future & Promise
在异步处理时,经常用到这两个接口
netty 的Future 和 jdk 中Future ,但是是两个接口,属于继承。而Promise 对netty中future进行了扩展
- jdk Future 只能同步等待任务结束 (成功 或者 失败 ) 才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束.
- netty Promise 不仅有netty future的能力,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
| 功能/名称 | jdk Future | netty Future | Promise |
| — | — | — | — |
| cancel | 取消任务 | - | - |
| isCanceled | 任务是否取消 | - | - |
| isDone | 任务是否完成,不能区分成功失败 | - | - |
| get | 获取任务结果,阻塞等待 | - | - |
| getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
| await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
| sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
| isSuccess | - | 判断任务是否成功 | - |
| cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
| addLinstener | - | 添加回调,异步接收结果 | - |
| setSuccess | - | - | 设置成功结果 |
| setFailure | - | - | 设置失败结果 |
jdk future
package com.xlgponentty.FutureAndPromise;import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;/*** @author wangqingwei* Created on 2022-06-11*/
public class TestJDKFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 线程池ExecutorService executorService = Executors.newFixedThreadPool(2);//2. 执行Future<Integer> future = executorService.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {System.out.println("执行计算!!1");Thread.sleep(1000);return 50;}});//3. 获取结果System.out.println("等待结果!11");System.out.println(future.get());}
}
netty future
package com.xlgponentty.FutureAndPromise;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import ioty.channel.EventLoop;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.util.concurrent.Future;/*** @author wangqingwei* Created on 2022-06-11*/
public class TestNettyFuture {private static final Logger logger = LoggerFactory.getLogger(TestNettyFuture.class);public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop loop = group.next();//2. 执行Future<Integer> future = loop.submit(() -> {logger.debug("执行计算中!");Thread.sleep(1000);return 50;});//3. 获取结果logger.debug("等待结果!1!");// 立即返回结果logger.debug("{}", future.getNow());// 同步返回结果 也就是阻塞
// logger.debug("{}", future.get());// 异步返回结果future.addListener(future1 -> logger.debug("接受结果: {}", future1.getNow()));}
}
netty promise
package com.xlgponentty.FutureAndPromise;import java.util.concurrent.ExecutionException;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import ioty.channel.EventLoop;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.util.concurrent.DefaultPromise;/*** @author wangqingwei* Created on 2022-06-11*/
public class TestNettyPromise {private static final Logger logger = LoggerFactory.getLogger(TestNettyPromise.class);public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop loop = group.next();DefaultPromise<Object> promise = new DefaultPromise<>(loop);new Thread(() -> {logger.debug("开始计算!!");try {int i = 1 / 0;Thread.sleep(1000);promise.setSuccess(50);} catch (Exception e) {promise.setFailure(e);}}).start();logger.debug("等待结果!!");// logger.debug("接受结果: {}", promise.get());logger.debug("接受结果: {}", promise.isSuccess());}
}
四、handler & Pipline
代码:
package com.xlgponentty.Pipeline;import static com.xlgponent.nio.TestCommon.NETTY_SERVER_PORT;import java.nio.charset.StandardCharsets;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import ioty.bootstrap.ServerBootstrap;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import ioty.channel.ChannelInitializer;
import ioty.channel.ChannelOutboundHandlerAdapter;
import ioty.channel.ChannelPipeline;
import ioty.channel.ChannelPromise;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioServerSocketChannel;
import ioty.channel.socket.nio.NioSocketChannel;/*** @author wangqingwei* Created on 2022-06-12*/
public class TestPipelineServer {private static final Logger logger = LoggerFactory.getLogger(TestPipelineServer.class);public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// socket中 是有pipeline类似一个处理流 双向链表// 默认有head -> tail 插入的分为入站和出站// head -> h1 -> h2 -> h3 -> h4 -> tailChannelPipeline pipeline = ch.pipeline();// 入站的handler处理完数据, 还可以把处理的数据 msg 给到下一个hander// 调用后一个handler处理: ctx.fireChannelRead(msg); == super.channelRead(ctx, msg);pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("1");super.channelRead(ctx, "1 -> 变更数据");}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.debug("2, msg = {}", msg);// 3. 执行write写出数据, 这样出站handler才能执行ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes(StandardCharsets.UTF_8)));super.channelRead(ctx, msg);}});// 出站的顺序是从tail向前的pipeline.addLast("h3", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception {logger.debug("3");super.write(ctx, msg, promise);}});pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Exception {logger.debug("4");super.write(ctx, msg, promise);}});}}).bind(NETTY_SERVER_PORT);}
}
运行结果:
2022-06-12 16:47:31,253 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 47] 1
2022-06-12 16:47:31,253 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 54] 2, msg = 1 -> 变更数据
2022-06-12 16:47:31,256 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 75] 4
2022-06-12 16:47:31,257 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 67] 3当然因为有LoggerHanlder 在客户端, 有server给客户端发的数据. server...
- handler 是可以取名的addLast(“h3”, new ChannelOutboundHandlerAdapter(){
- pipeline是一个双向链表, 默认head -> tail 两个节点
- 通过ctx.fireChannelRead(msg) 等方法,将当前handler的处理结果传递给下一个handler
- **入站 (Inbound) ,**从head 向后,直到不是Inbound
- 出站 (outbound), 从tail向前调用handler, 直到不是outbound,但是需要有触发.
- channel.writeAndFlush(ctx.alloc().buffer().writeBytes(“server…”.getBytes(
StandardCharsets.UTF_8))); 从tail向前走找outbound - ctx.writeAndFlush(), 从当前handler向前找outbound
- channel.writeAndFlush(ctx.alloc().buffer().writeBytes(“server…”.getBytes(
具体的结构
调用顺序
EmbeddedChannel
embeddedChannel 可以用于测试各个handler, 通过其构造函数按照顺序传入需要测试的handler, 然后调用对应的Inbound 和 Outbound方法即可.
public class TestEmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("1");super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("2");super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("3");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("4");super.write(ctx, msg, promise);}};// 用于测试Handler的ChannelEmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);// 执行Inbound操作 channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));// 执行Outbound操作channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));}
}
五、ByteBuf
调试工具方法
public static void log(ByteBuf buffer) {int length = buffer.readableBytes();int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;StringBuilder buf = new StringBuilder(rows * 80 * 2).append("read index:").append(buffer.readerIndex()).append(" write index:").append(buffer.writerIndex()).append(" capacity:").append(buffer.capacity()).append(NEWLINE);appendPrettyHexDump(buf, buffer);System.out.println(buf.toString());
}
该方法可以帮助我们更为详细地查看ByteBuf中的内容
创建
package com.xlgponentty.ByteBuf;import java.nio.charset.StandardCharsets;import org.junit.Test;import com.xlgponent.ks.utils.ByteBufUtl;import ioty.buffer.ByteBuf;
import ioty.buffer.ByteBufAllocator;/*** @author wangqingwei* Created on 2022-06-12*/
public class ByteBufTest {@Testpublic void byteBufStudy(){ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();// initByteBufUtl.log(buffer);StringBuilder stringBuilder = new StringBuilder();for (int i = 0; i < 300; i++) {stringBuilder.append("a");}buffer.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));ByteBufUtl.log(buffer);}}
运行结果
结果:
ead index:0 write index:0 capacity:256read index:0 write index:300 capacity:512
...
通过** ByteBufAllocator.DEFAULT.buffer() ** 创建byteBuf 默认大小 256,使用直接内存创建,也可以指定其大小.
并且我我们来看,其有扩容机制,这一点是NIO所没有的.
如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建
直接内存与堆内存
@Testpublic void directAndHeapByteBuf(){// 默认是以直接内存ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);System.out.println(buffer.getClass());// 堆内存创建ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(16);System.out.println(buf.getClass());ByteBuf buffer2 = ByteBufAllocator.DEFAULT.directBuffer(16);System.out.println(buffer2.getClass());}
**结果: **
class ioty.buffer.PooledUnsafeDirectByteBuf // 池化直接内存
class ioty.buffer.PooledUnsafeHeapByteBuf // 池化堆内存
class ioty.buffer.PooledUnsafeDirectByteBuf
- 直接内存创建和销毁的代价昂贵,但读写性能高 (少一次内存复制), 适合配合池化功能一起用
- 直接内存对GC 压力小,因为这一部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放.
池化和非池化
优势: 重用ByteBuf
- 没有池化,每次都得创建新的byteBuf实例,这个操作对直接内存代价昂贵,就算采用对堆内存,会有GC压力
- 有了池化,重用池中byteBuf实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的问题
开启:
-Dioty.allocator.type={unpooled|pooled}
- 4.1 以后,非android 平台默认启用,android启用非池化
- 4.1 以前都是非池化
组成
ByteBuf 主要有以下几个组成部分
- 最大容量 与 当前容量
- 构造ByteBuf时 两个参数, 一个初始容量,一个最大容量。第二个参数默认为integer.MAX_VALUE
- 当byteBuf 容量无法容纳所有数据时,会进行扩容,但如果超过最大容量,会报出java.lang.IndexOutOfBoundsExecption异常
- 读写不同:
- ByteBuffer 只使用position 控制,而ByteBuf 分别由读指针和写指针控制。进行读写操作时,无需进行模式的切换.
写入
常用方法:
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian(大端写入),即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian(小端写入),即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 | CharSequence为字符串类的父类,第二个参数为对应的字符集 |
注意
- 这些方法的未指明返回值的,其返回值都是byteBuf, 可以链式带哦用来写入不同的数据
- 网络传输中,默认习惯是 Big Endian, 使用 writeInt(int value)
使用方法
public class ByteBufStudy {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);ByteBufUtil.log(buffer);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4});ByteBufUtil.log(buffer);buffer.writeInt(5);ByteBufUtil.log(buffer);buffer.writeIntLE(6);ByteBufUtil.log(buffer);buffer.writeLong(7);ByteBufUtil.log(buffer);}
}
运行结果
read index:0 write index:0 capacity:16read index:0 write index:4 capacity:16+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+read index:0 write index:8 capacity:16+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+read index:0 write index:12 capacity:16+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 |............ |
+--------+-------------------------------------------------+----------------+read index:0 write index:20 capacity:20+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
扩容
扩容规则:
- 写入后未超过 512 字节,则选下一个16的整数倍扩容。
- 例如 写入后12 个字节,则扩容后 capacity是16字节
- 写入后超过 512字节,则选 2^n
- 例如写入后 514, 则扩容后 2^10 = 1024 字节.
- 当前扩容不能超过 maxCapacity
读取
还有方法是采用get开头的一系列方法,这些方法不会改变read index
回收/释放
retain& release
由于Netty 中有堆外内存(直接内存)的 byteBuf 实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。
- UnpooledHeapByteBuf 使用的是JVM 内存,只需等GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty 这里使用了引用计数法来控制 回收内存,每个ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为1
- 调用release 方法计数减1,如果计数为0,ByteBuf内存被回收
- 调用 retain 方法计数+1,表示调用者没有用完之前,其他handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使ByteBuf 对象还在,其各个方法均无法正常使用
释放规则
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则: 谁是最后使用者,谁负责release
- 起点,对于NIO 实现来讲,在ioty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
- 入站 byteBuf 处理规则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 byteBuf 转换为其他类型的 java 对象,这时ByteBuf 就没有用了,必须release
- 如果不调用** ctx.fireChannelRead(msg)向后传递,那么也必须release**
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
- 出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由HeadContext flush后release
HeadContext.write @Overridepublic final void write(Object msg, ChannelPromise promise) {assertEventLoop();ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;if (outboundBuffer == null) {// If the outboundBuffer is null we know the channel was closed and so// need to fail the future right away. If it is not null the handling of the rest// will be done in flush0()// See (promise, newClosedChannelException(initialCloseCause));// release message now to prevent resource-leakReferenceCountUtil.release(msg);return;}
- 异常处理原则
- 有时候不清楚ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回true
while (!buffer.release()) {}
当ByteBuf 被传到了pipline的head 与tail 时,ByteBuf 会被其中的方法彻底释放,但前提是ByteBuf 被传递到了head 与 tail中
TailContext 释放源码
protected void onUnhandledInboundMessage(Object msg) {try {logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);} finally {// 具体的释放方法ReferenceCountUtil.release(msg);}
}//判断是否是 ByteBuf 因为ByteBuf 实现了ReferenceCounted接口.
public static boolean release(Object msg) {return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
切片
ByteBuf 切片是 【零拷贝】 的体现之一,对原始ByteBuf 进行切片成多个 ByteBuf ,切片后的ByteBuf 并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf 维护对应的read、write指针
得到分片后的 buffer后,要调用其retain 方法,使其内部的引用计数 + 1。避免原ByteBuf 释放,导致切片也无法使用。
修改原ByteBuf中的值,也会影响切片后得到的ByteBuf
package com.xlgponentty.ByteBuf;import com.xlgponent.ks.utils.ByteBufUtil;import ioty.buffer.ByteBuf;
import ioty.buffer.ByteBufAllocator;
import ioty.buffer.CompositeByteBuf;public class TestSlice {public static void main(String[] args) {// 创建ByteBufByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);// 向buffer中写入数据buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});// 将buffer分成两部分ByteBuf slice1 = buffer.slice(0, 5);ByteBuf slice2 = buffer.slice(5, 5);// 需要让分片的buffer引用计数加一// 避免原Buffer释放导致分片buffer无法使用slice1.retain();slice2.retain();ByteBufUtil.log(slice1);ByteBufUtil.log(slice2);// 更改原始buffer中的值System.out.println("===========修改原buffer中的值===========");buffer.setByte(0,5);System.out.println("===========打印slice1===========");ByteBufUtil.log(slice1);// 分片合并final CompositeByteBuf byteBufs = ByteBufAllocator.DEFAULTpositeBuffer(10);byteBufs.addComponents(true, slice1, slice1);ByteBufUtil.log(slice1);}
}
当然还有 合并这些分片的方法, 还有copy(复制数据,内存地址变化)等
优势
- 池化思想-- 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能。
- **读写指针分离 **,不需要像 ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更加流畅
- 很多地方体现零拷贝,比如
- slice、duplicate、compositeByteBuf
更多推荐
【Netty 几个组件介绍】
发布评论