【netty从入门到放弃】netty转发tcp数据到多客户端

编程入门 行业动态 更新时间:2024-10-27 12:23:52

【netty从入门到放弃】netty转发tcp数据到多<a href=https://www.elefans.com/category/jswz/34/1771403.html style=客户端"/>

【netty从入门到放弃】netty转发tcp数据到多客户端

目录

  • 创建数据库表
  • xml
  • 实体类
  • 启动类
  • 线程类
  • 客户端代码
  • handle
  • controller类
  • 缓存tcp链接

接到一个需求,需要实现转发通讯模块tcp数据其他的服务器,也就是转发tcp数据到多客户端

任务拆解:

  • 首先需要建立多客户端,每个客户端有一个独立的clientId和对应的tcp通道对应
  • 能动态的根据clientId关闭对应的转发任务
  • 停止服务的时候,需要断开所有的客户端连接,减少开销
  • 客户端需要实现断线重连(考虑到断网的清空)

注意:本篇文章是只是实现转发操作,不支持转发的服务器,反向控制设备,需要做特殊处理,如果大家感兴趣,给我留言

下面我们根据我们头脑风暴的结果,来想办法实现上面的过程

创建数据库表

CREATE TABLE `station_message_transmit` (`id` bigint(32) NOT NULL COMMENT '主键',`station_id` int(11) NOT NULL COMMENT '站点id',`host` varchar(50) DEFAULT NULL COMMENT '主机ip',`port` int(11) DEFAULT NULL COMMENT '端口',`create_by` varchar(64) DEFAULT NULL COMMENT '创建人',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_by` varchar(64) DEFAULT NULL COMMENT '修改人',`update_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 所有的转发数据,都是基于单个站点(单个设备)
  • id是唯一的,后续会通过该id绑定tcp通道,来实现发数据,关闭连接等操作

xml

<?xml version="1.0"?>
<projectxsi:schemaLocation=".0.0 .0.0.xsd"xmlns=".0.0"xmlns:xsi=""><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version><relativePath /></parent><groupId>boot.base.tcp.client</groupId><artifactId>boot-example-base-tcp-client-2.0.5</artifactId><version>0.0.1-SNAPSHOT</version><name>boot-example-base-tcp-client-2.0.5</name><url>;/url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>ioty</groupId><artifactId>netty-all</artifactId></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency></dependencies><build><plugins><!-- 打包成一个可执行jar --><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
</project>
  • 所需要的依赖,这里只是实现一个简单的demo,来实践一下,我的设想是否能实现。

实体类

  • yml配置文件就不需要配置,一切从简,默认的端口是8080
package com.test;import lombok.Data;/*** @author wu* @version 1.0* @date 2023/10/18 16:39*/
@Data
public class StationMessageTransmit {/** 唯一编号 */private Long id;/** 站点id */private Integer stationId;/** 主机ip */private String host;/** 端口 */private Integer port;
}

启动类

package com.test;
import ioty.channel.ChannelHandlerContext;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** wu*/
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class BootNettyClientApplication implements CommandLineRunner, ApplicationListener {public static void main( String[] args ) {SpringApplication app = new SpringApplication(BootNettyClientApplication.class);app.run(args);System.out.println( "Hello World!" );}@Async@Overridepublic void run(String... args) throws Exception {StationMessageTransmit tran = new StationMessageTransmit();tran.setId(1L);tran.setHost("192.168.10.128");tran.setPort(5000);tran.setStationId(13);StationMessageTransmit tran1 = new StationMessageTransmit();tran1.setId(2L);tran1.setHost("192.168.10.128");tran1.setPort(5001);tran1.setStationId(13);List<StationMessageTransmit> traces = new ArrayList<StationMessageTransmit>();traces.add(tran);traces.add(tran1);for (StationMessageTransmit trace : traces) {BootNettyClientThread thread = new BootNettyClientThread(trace);thread.start();}}@Overridepublic void onApplicationEvent(ApplicationEvent applicationEvent) {if(applicationEvent instanceof ContextClosedEvent){System.out.println("应用关闭事件");for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {ChannelHandlerContext channelHandlerContext = entry.getValue();if(channelHandlerContext != null){System.out.println("关闭链接:"+entry.getKey());channelHandlerContext.close();}}}else if(applicationEvent instanceof ContextRefreshedEvent){System.out.println("应用刷新事件");}else if(applicationEvent instanceof ContextStartedEvent){System.out.println("应用开启事件");}else if(applicationEvent instanceof ContextStoppedEvent){System.out.println("应用停止事件");}}
}
  • run方法里面主要干的活,是一个伪代码,模拟从数据拿数据,再初始化创建多个客户端。

  • onApplicationEvent方法主要是监控服务停止的事件,这是考虑到,tcp是长链接,跟其他服务器链接是一直没有中断,会存在多次重建连接的问题,所以需要再关闭事件中,关闭所有的tcp客户端连接

线程类

package com.test;/**** netty 客户端* wu*/
public class BootNettyClientThread extends Thread {private StationMessageTransmit trace;public BootNettyClientThread(StationMessageTransmit trace){this.trace = trace;}@Overridepublic void run() {try {new BootNettyClient().connect(trace);} catch (Exception e) {throw new RuntimeException(e);}}
}
  • 传实体类,主要是为了保证clientId和通道保证对应

客户端代码

package com.test;import ioty.bootstrap.Bootstrap;
import ioty.channel.*;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.SocketChannel;
import ioty.channel.socket.nio.NioSocketChannel;
import ioty.handler.codec.LineBasedFrameDecoder;
import ioty.handler.codec.string.StringDecoder;/**** netty 客户端* wu*/
public class BootNettyClient {private EventLoopGroup group;public void connect(StationMessageTransmit trace) throws Exception{/*** 客户端的NIO线程组**/group = new NioEventLoopGroup();try {/*** Bootstrap 是一个启动NIO服务的辅助启动类 客户端的*/Bootstrap bootstrap = new Bootstrap();bootstrap = bootstrap.group(group);bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);/*** 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息*/bootstrap = bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024 * 1024));socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new TcpHandler(trace));}});/*** 连接服务端*/ChannelFuture future = bootstrap.connect(trace.getHost(), trace.getPort()).sync();if(future.isSuccess()) {System.out.println("netty client start success="+trace.toString());/*** 等待连接端口关闭*/future.channel().closeFuture().sync();}} finally {/*** 退出,释放资源*/group.shutdownGracefully().sync();}}}

handle

package com.test;import java.io.IOException;
import java.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import ioty.channel.ChannelHandler;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import ioty.channel.EventLoop;/**** I/O数据读写处理类* wu*/
@ChannelHandler.Sharable
public class TcpHandler extends ChannelInboundHandlerAdapter{private static ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(5);private StationMessageTransmit trace;public TcpHandler(StationMessageTransmit trace){this.trace = trace;}/*** 从服务端收到新的数据时,这个方法会在收到消息时被调用*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {if(msg == null){return;}System.out.println("channelRead:read msg:"+msg.toString());//回应服务端//ctx.write("I got server message thanks server!");}/*** 从服务端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {System.out.println("channelReadComplete");ctx.flush();}/*** 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {System.out.println("exceptionCaught");cause.printStackTrace();ctx.close();//抛出异常,断开与客户端的连接}/*** 客户端与服务端第一次建立连接时 执行*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelActive(ctx);InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();System.out.println("服务器ip:"+clientIp+",clientId:"+trace.getId());BootNettyClientGroupCache.save(trace.getId().toString(), ctx);}/*** 客户端与服务端 断连时 执行*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelInactive(ctx);InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();ctx.close(); //断开连接时,必须关闭,否则造成资源浪费System.out.println("channelInactive:"+clientIp);//客户端重连//reset();}/*** 客户端重连*/public  void reset(){//增加一个伪代码,从服务器查询id对应的转发数据是否存在,不存在,则不继续运行转发任务SCHEDULED_EXECUTOR.schedule(() -> {try {System.err.println("服务端链接不上,开始重连操作...");new BootNettyClient().connect(trace);} catch (Exception e) {e.printStackTrace();}}, 3, TimeUnit.SECONDS);}}
  • reset方法是为了实现客户端重连,3秒钟调用一次
  • channelInactive方法,客户端和服务器断开连接时会触发
  • channelActive方法,客户端和服务器建立连接时,需要实现client和通道的绑定关系,方便后续回写数据

controller类

package com.test;import ioty.buffer.Unpooled;
import ioty.channel.ChannelHandlerContext;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;/*** 	wu*/
@RestController
public class BootNettyClientController {/*** 给所有客户端发送消息* @param content* @return*/@PostMapping("/reportAllClientDataToServer")public String reportAllClientDataToServer(@RequestParam(name="content", required = true) String content) {for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {ChannelHandlerContext ctx = entry.getValue();ctx.writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));}return "ok";}/*** 停止指定的客户端* @param code* @return* @throws InterruptedException*/@PostMapping("/stopStationByCode")public String downDataToClient(@RequestParam(name="code", required = true) String code) throws InterruptedException {ChannelHandlerContext ctx =  BootNettyClientGroupCache.get(code);ctx.close();BootNettyClientGroupCache.remove(code);return "success";}}
  • 主要是提供两个测试方法,可以通过apifox调试工具进行模拟请求

缓存tcp链接

package com.test;import ioty.channel.ChannelHandlerContext;
import ioty.channel.EventLoopGroup;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/***  wu*/
public class BootNettyClientGroupCache {/*** 存放所有的连接,key是转发id,value是对应的数据*/public static volatile Map<String, ChannelHandlerContext> groupMapCache = new ConcurrentHashMap<String, ChannelHandlerContext>();public static void add(String code, ChannelHandlerContext group){groupMapCache.put(code,group);}public static ChannelHandlerContext get(String code){return groupMapCache.get(code);}public static void remove(String code){groupMapCache.remove(code);}public static void save(String code, ChannelHandlerContext channel) {if(groupMapCache.get(code) == null) {add(code,channel);}}}
  • 存放所有的通道

更多推荐

【netty从入门到放弃】netty转发tcp数据到多客户端

本文发布于:2023-12-07 10:04:51,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1670933.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:客户端   入门   数据   netty   tcp

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!