springboot集成netty开发服务端和客户端

编程入门 行业动态 更新时间:2024-10-08 00:33:38

springboot集成netty开发<a href=https://www.elefans.com/category/jswz/34/1769467.html style=服务端和客户端"/>

springboot集成netty开发服务端和客户端

maven里面引入netty依赖

<dependency><groupId>ioty</groupId><artifactId>netty-all</artifactId><version>4.1.51.Final</version>
</dependency>

创建NettyServer类

package com.NettyServer.service;import com.NettyServermon.ServerChannelInitializer;
import ioty.bootstrap.ServerBootstrap;
import ioty.channel.ChannelFuture;
import ioty.channel.ChannelOption;
import ioty.channel.EventLoopGroup;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.beans.factory.annotation.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.InetSocketAddress;@Component
@Slf4j
public class NettyServer {/*** boss 线程组用于TCP处理连接工作*/private final EventLoopGroup boss = new NioEventLoopGroup();/*** work 线程组用于IO数据处理*/private final EventLoopGroup work = new NioEventLoopGroup();@Value("${netty.port}")private Integer port;/*** 启动Netty Server*/@PostConstructpublic void start() throws InterruptedException {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, work)// 指定Channel.channel(NioServerSocketChannel.class)//使用指定的端口设置套接字地址.localAddress(new InetSocketAddress(port))//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 1024)//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文.childOption(ChannelOption.SO_KEEPALIVE, true)//将小的数据包包装成更大的帧进行传送,提高网络的负载.childOption(ChannelOption.TCP_NODELAY, true).childHandler(new ServerChannelInitializer());ChannelFuture future = bootstrap.bind().sync();if (future.isSuccess()) {log.info("已启动 Netty Server");}}@PreDestroypublic void destory() throws InterruptedException {boss.shutdownGracefully().sync();work.shutdownGracefully().sync();log.info("关闭Netty");}

创建ServerChannelInitializer类

package com.NettyServermon;import com.NettyServer.service.NettyServerHandler;
import ioty.channel.ChannelInitializer;
import ioty.channel.socket.SocketChannel;
import ioty.handler.codec.serialization.ClassResolvers;
import ioty.handler.codec.serialization.ObjectDecoder;
import ioty.handler.codec.serialization.ObjectEncoder;public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) {//添加编解码socketChannel.pipeline().addLast(new ObjectDecoder(10 * 1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));socketChannel.pipeline().addLast(new ObjectEncoder());socketChannel.pipeline().addLast(new NettyServerHandler());}}

创建NettyServerHandler类

package com.NettyServer.service;import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelId;
import ioty.channel.ChannelInboundHandlerAdapter;
import ioty.handler.timeout.IdleState;
import ioty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {private static NettyServerHandler handler;/*** 管理一个全局map,保存连接进服务端的通道数量*/private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();//@Autowired//private IBaseAttachmentService attachmentService;@PostConstructpublic void init() {handler = this;}/*** @param ctx* @author xxx on 2021/4/28 16:10* @DESCRIPTION: 有客户端连接服务器会触发此函数* @return: void*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = insocket.getAddress().getHostAddress();int clientPort = insocket.getPort();//获取连接通道唯一标识ChannelId channelId = ctx.channel().id();//如果map中不包含此连接,就保存连接if (CHANNEL_MAP.containsKey(channelId)) {log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());} else {//保存连接CHANNEL_MAP.put(channelId, ctx);log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");log.info("连接通道数量: " + CHANNEL_MAP.size());}}/*** @param ctx* @author xxx on 2021/4/28 16:10* @DESCRIPTION: 有客户端终止连接服务器会触发此函数* @return: void*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) {InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = insocket.getAddress().getHostAddress();ChannelId channelId = ctx.channel().id();//包含此客户端才去删除if (CHANNEL_MAP.containsKey(channelId)) {//删除连接CHANNEL_MAP.remove(channelId);log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");log.info("连接通道数量: " + CHANNEL_MAP.size());}}/*** @param ctx* @author xxx on 2021/4/28 16:10* @DESCRIPTION: 有客户端发消息会触发此函数* @return: void*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("【" + ctx.channel().id() + "】" + " :" + msg.toString());//可以在这个地方写业务处理逻辑入库或者什么的...................................// 结果写到客户端/***  下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数**///响应客户端this.channelWrite(ctx.channel().id(), msg);}/*** @param msg       需要发送的消息内容* @param channelId 连接通道唯一id* @author xxx 2021/05/14 16:10* @DESCRIPTION: 服务端给客户端发送消息* @return: void*/public void channelWrite(ChannelId channelId, Object msg) throws Exception {ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);if (ctx == null) {log.info("通道【" + channelId + "】不存在");return;}if (msg == null || msg == "") {log.info("服务端响应空的消息");return;}//将客户端的信息直接返回写入ctxctx.write(msg);//刷新缓存区ctx.flush();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {String socketString = ctx.channel().remoteAddress().toString();if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {log.info("Client: " + socketString + " READER_IDLE 读超时");ctx.disconnect();} else if (event.state() == IdleState.WRITER_IDLE) {log.info("Client: " + socketString + " WRITER_IDLE 写超时");ctx.disconnect();} else if (event.state() == IdleState.ALL_IDLE) {log.info("Client: " + socketString + " ALL_IDLE 总超时");ctx.disconnect();}}}/*** @param ctx* @author xxx on 2021/4/28 16:10* @DESCRIPTION: 发生异常会触发此函数* @return: void*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println();ctx.close();log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());//cause.printStackTrace();}}

下面贴出来客户端代码

创建NettyClient类

package com.NettyClient.service;import com.NettyClientmon.NettyClientInitializer;
import ioty.bootstrap.Bootstrap;
import ioty.channel.ChannelFuture;
import ioty.channel.ChannelFutureListener;
import ioty.channel.ChannelOption;
import ioty.channel.EventLoopGroup;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Slf4j
@Component
public class NettyClient {static final String HOST = "127.0.0.1";static final int PORT = 8899;private EventLoopGroup group;private Bootstrap b;private ChannelFuture cf;private NettyClientInitializer nettyClientInitializer;public NettyClient() {nettyClientInitializer = new NettyClientInitializer();group = new NioEventLoopGroup();b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).handler(nettyClientInitializer);}public void connect() {try {this.cf = b.connect(HOST, PORT).sync();} catch (InterruptedException e) {log.error("客户端连接服务端异常:" + e);}}public ChannelFuture getChannelFuture() {if (this.cf == null) {this.connect();}if (!this.cf.channel().isActive()) {this.connect();}return this.cf;}public void close() {try {this.cf.channel().closeFuture().sync();this.group.shutdownGracefully();} catch (InterruptedException e) {e.printStackTrace();}}public void setMessage(String msg) throws InterruptedException {ChannelFuture cf = this.getChannelFuture();cf.channel().writeAndFlush(msg);}public static void main(String[] args) {try {} catch (Exception e) {log.error("异常:" + e);}}}

创建NettyClientHandler类

package com.NettyClient.service;import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {private String result;@Overridepublic void channelActive(ChannelHandlerContext ctx) {}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {//if (!ObjectUtils.isEmpty(msg)) {result = (String) msg;//}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}

创建NettyClientInitializer类

package com.NettyClientmon;import com.NettyClient.service.NettyClientHandler;
import ioty.channel.ChannelInitializer;
import ioty.channel.socket.SocketChannel;
import ioty.handler.codec.serialization.ClassResolvers;
import ioty.handler.codec.serialization.ObjectDecoder;
import ioty.handler.codec.serialization.ObjectEncoder;
import ioty.handler.timeout.ReadTimeoutHandler;
import org.springframework.beans.factory.annotation.Autowired;public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));socketChannel.pipeline().addLast(new ObjectEncoder());socketChannel.pipeline().addLast(new NettyClientHandler());}}

客户端直接可以在业务代码里面调用发送数据到服务端就行了、我简单写了一个定时器测试发送的例子

package com.NettyClient;import com.NettyClient.service.NettyClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;@Component
@Slf4j
@EnableAsync
public class TaskManager {@Autowiredprivate NettyClient nettyClient;@Async@Scheduled(cron = "0 */1 * * * ?")public void test() throws InterruptedException {try {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");nettyClient.setMessage("客户端发送此数据" + df.format(new Date()));} catch (Exception e) {log.error("异常:" + e);}}@Async@Scheduled(cron = "0 */1 * * * ?")public void test2() throws InterruptedException {try {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");nettyClient.setMessage("客户端2发送此数据" + df.format(new Date()));} catch (Exception e) {log.error("异常:" + e);}}}

 

 

 

 

 

 

 

 

 

 

更多推荐

springboot集成netty开发服务端和客户端

本文发布于:2024-03-05 21:49:14,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1713537.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:服务端   客户端   springboot   netty

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!