SpringBoot 集成 Netty

编程入门 行业动态 更新时间:2024-10-19 15:25:45

<a href=https://www.elefans.com/category/jswz/34/1769943.html style=SpringBoot 集成 Netty"/>

SpringBoot 集成 Netty

SpringBoot集成Netty

    • 1、POM
    • 2、服务端
      • 2.1、netty服务端处理类
      • 2.2、服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
      • 2.3、Netty服务启动类
    • 3、客户端
      • 3.1、客户端处理器
      • 3.2、客户端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器,客户端服务端编解码要一致
      • 3.3、客户端

1、POM

<!-- 添加SpringBoot父类依赖 --><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.1.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- json jar --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.22</version></dependency><!-- lombok的主要作用是通过一些注解,消除样板式代码ps:对bean的简化 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--    Netty    --><dependency><groupId>ioty</groupId><artifactId>netty-all</artifactId><version>4.1.32.Final</version></dependency></dependencies>

2、服务端

2.1、netty服务端处理类

import com.huayuemon.util.SpringBeanUtils;
import com.huayue.yhhjkj.service.YHHJKJService;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelId;
import ioty.channel.ChannelInboundHandlerAdapter;
import ioty.handler.timeout.IdleState;
import ioty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import java.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;/*** @description: netty服务端处理类**/@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 管理一个全局map,保存连接进服务端的通道数量*/private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();/*** @DESCRIPTION: 有客户端连接服务器会触发此函数* @return: void*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = insocket.getAddress().getHostAddress();int clientPort = insocket.getPort();//获取连接通道唯一标识ChannelId channelId = ctx.channel().id();System.out.println();//如果map中不包含此连接,就保存连接if (CHANNEL_MAP.containsKey(channelId)) {log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());} else {//保存连接CHANNEL_MAP.put(channelId, ctx);log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");log.info("连接通道数量: " + CHANNEL_MAP.size());}}/*** @DESCRIPTION: 有客户端终止连接服务器会触发此函数* @return: void*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) {InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = insocket.getAddress().getHostAddress();ChannelId channelId = ctx.channel().id();//包含此客户端才去删除if (CHANNEL_MAP.containsKey(channelId)) {//删除连接CHANNEL_MAP.remove(channelId);System.out.println();log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");log.info("连接通道数量: " + CHANNEL_MAP.size());}}/*** @DESCRIPTION: 有客户端发消息会触发此函数* @return: void*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//        log.info("加载客户端报文......");
//        log.info("【" + ctx.channel().id() + "】" + " :" + msg);//响应客户端String response = "TCPServer response:" + msg;System.out.println(response);this.channelWrite(ctx.channel().id(), response);}/*** @param msg       需要发送的消息内容* @param channelId 连接通道唯一id* @DESCRIPTION: 服务端给客户端发送消息* @return: void*/public void channelWrite(ChannelId channelId, Object msg) throws Exception {ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);if (ctx == null) {log.info("通道【" + channelId + "】不存在");return;}if (msg == null || msg == "") {log.info("服务端响应空的消息");return;}//将客户端的信息直接返回写入ctxctx.write(msg);//刷新缓存区ctx.flush();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {String socketString = ctx.channel().remoteAddress().toString();if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {log.info("Client: " + socketString + " READER_IDLE 读超时");ctx.disconnect();} else if (event.state() == IdleState.WRITER_IDLE) {log.info("Client: " + socketString + " WRITER_IDLE 写超时");ctx.disconnect();} else if (event.state() == IdleState.ALL_IDLE) {log.info("Client: " + socketString + " ALL_IDLE 总超时");ctx.disconnect();}}}/*** @DESCRIPTION: 发生异常会触发此函数* @return: void*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println();ctx.close();log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());//cause.printStackTrace();}
}

2.2、服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器


import ioty.buffer.ByteBuf;
import ioty.buffer.Unpooled;
import ioty.channel.ChannelInitializer;
import ioty.channel.socket.SocketChannel;
import ioty.handler.codec.DelimiterBasedFrameDecoder;
import ioty.handler.codec.string.StringDecoder;
import ioty.handler.codec.string.StringEncoder;
import ioty.util.CharsetUtil;/*** @description: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器**/public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//以换行符作为分隔符ByteBuf buf= Unpooled.copiedBuffer("\n".getBytes());//分隔符解码器channel.pipeline().addLast(new DelimiterBasedFrameDecoder(10240,true,buf));channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));channel.pipeline().addLast(new NettyServerHandler());}}

2.3、Netty服务启动类

import ioty.bootstrap.ServerBootstrap;
import ioty.channel.ChannelFuture;
import ioty.channel.ChannelOption;
import ioty.channel.EventLoopGroup;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.InetSocketAddress;/*** @description: netty服务启动类**/@Slf4j
@Component
public class NettyServer {public void start(InetSocketAddress address) {//配置服务端的NIO线程组// 用于接受客户端连接的请求 (并没有处理请求)EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 用于处理客户端连接的读写操作EventLoopGroup workerGroup = new NioEventLoopGroup(2);try {ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)  // 绑定线程池.channel(NioServerSocketChannel.class).localAddress(address).childHandler(new NettyServerChannelInitializer())//编码解码.option(ChannelOption.SO_BACKLOG, 256)  //  还没有被accept 取出的连接.childOption(ChannelOption.SO_KEEPALIVE, true);  //保持长连接,2小时无数据激活心跳机制// 绑定端口,开始接收进来的连接ChannelFuture future = bootstrap.bind(address).sync();log.info("netty服务器开始监听端口:" + address.getPort());//等待服务端监听端口关闭 服务器同步连接断开时,这句代码才会往下执行future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();//优雅退出,释放 NIO 线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

3、客户端

3.1、客户端处理器

import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelId;
import ioty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ConcurrentHashMap;/*** @description: 客户端处理类**/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*** 计算有多少客户端接入,第一个string为客户端ip*/private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CLIENT_MAP = new ConcurrentHashMap<>();@Overridepublic void channelActive(ChannelHandlerContext ctx) {CLIENT_MAP.put(ctx.channel().id(), ctx);log.info("ClientHandler Active");}/*** @param ctx* @author xiongchuan on 2019/4/28 16:10* @DESCRIPTION: 有服务端端终止连接服务器会触发此函数* @return: void*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) {ctx.close();log.info("服务端终止了服务");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {log.info("回写数据:" + msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {//cause.printStackTrace();log.info("服务端发生异常【" + cause.getMessage() + "】");ctx.close();}/*** @param msg       需要发送的消息内容* @param channelId 连接通道唯一id* @author xiongchuan on 2019/4/28 16:10* @DESCRIPTION: 客户端给服务端发送消息* @return: void*/public void channelWrite(ChannelId channelId, String msg) {ChannelHandlerContext ctx = CLIENT_MAP.get(channelId);if (ctx == null) {log.info("通道【" + channelId + "】不存在");return;}//将客户端的信息直接返回写入ctxctx.write(msg + " 时间:" + System.currentTimeMillis());//刷新缓存区ctx.flush();}
}

3.2、客户端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器,客户端服务端编解码要一致

import ioty.channel.ChannelInitializer;
import ioty.channel.socket.SocketChannel;
import ioty.handler.codec.string.StringDecoder;
import ioty.handler.codec.string.StringEncoder;
import ioty.util.CharsetUtil;/*** @description: 客户端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器,客户端服务端编解码要一致**/public class NettyClientChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));channel.pipeline().addLast(new NettyServerHandler());}
}

3.3、客户端

import ioty.bootstrap.Bootstrap;
import ioty.channel.*;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.SocketChannel;
import ioty.channel.socket.nio.NioSocketChannel;
import ioty.handler.codec.string.StringDecoder;
import ioty.handler.codec.string.StringEncoder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;/*** @description: 客户端**/@Slf4j
@Data
public class NettyClient implements Runnable {static final String HOST = System.getProperty("host", DefaultConstants.SOCKET_IP);static final int PORT = Integer.parseInt(System.getProperty("port", String.valueOf(DefaultConstants.SOCKET_PORT)));static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));private String content;public NettyClient(String content) {this.content = content;}@Overridepublic void run() {//        try {
//            Thread.sleep(5000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }// Configure the client.EventLoopGroup group = new NioEventLoopGroup();try {int num = 0;boolean boo =true;Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new NettyClientChannelInitializer() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast("decoder", new StringDecoder());p.addLast("encoder", new StringEncoder());p.addLast(new NettyClientHandler());}});ChannelFuture future = b.connect(HOST, PORT).sync();while (boo) {num++;String data="##0212QN=20201011184757000;ST=32;CN=2011;PW=123456;MN=2020001;Flag=0;CP=&&DataTime=20201011184757;011-Rtd=183.757,011-Flag=N;060-Rtd=0.805,060-Flag=N;003-Rtd=30,003-Flag=N;w10-Rtd=0,w10-Flag=N;y09-Rtd=2.31,y09-Flag=N&&\nfa\n";future.channel().writeAndFlush(data);//                try { //休眠一段时间
//                    Thread.sleep(500);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//
//                //每一条线程向服务端发送的次数if (num == 10) {boo = false;}System.out.println(content + "-----------------------------" + num);}log.info(content + "-----------------------------" + num);System.out.println(content + "-----------------------------" + num);//future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {group.shutdownGracefully();}}/***  下面是不加线程的*//*public static void main(String[] args) throws Exception {sendMessage("hhh你好?");}public static void sendMessage(String content) throws InterruptedException {// Configure the client.EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new NettyClientChannelInitializer() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast("decoder", new StringDecoder());p.addLast("encoder", new StringEncoder());p.addLast(new NettyClientHandler());}});ChannelFuture future = b.connect(HOST, PORT).sync();future.channel().writeAndFlush(content);future.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}*/
}

更多推荐

SpringBoot 集成 Netty

本文发布于:2023-11-15 18:08:47,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1603921.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:SpringBoot   Netty

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!