使用netty实现客户端服务端心跳重连
前言:
公司的加密机调度系统一直使用的是http请求调度的方式去调度,但是会出现网络故障导致某个客户端或者服务端断线的情况,导致很多请求信息以及回执信息丢失的情况,接着我们抛弃了http的方式,改为Tcp的方式去建立客户端和服务器之间的连接,并且要去实现断线重连的功能,经过讨论后决定使用java中成熟的nio框架– netty去解决这一系列的问题。
1.netty简单介绍:
在百度中对netty的解释是:
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
Netty框架并不只是封装了多路复用的IO模型,也包括提供了传统的阻塞式/非阻塞式 同步IO的模型封,Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。其并发高、传输快、封装好的特性受到了许多大公司的青睐,在这里我们就不过多的分析netty的原理和特性了,之后我会写一篇文章专门写一下从io到nio,再到netty的整个过程。重点讲一下netty的魅力所在,今天我们已代码实现为主,讲解一下在springboot架构中,用netty实现服务端和客户端之间的通信以及断线重连等机制。
2. 服务端代码:
首先,引入netty的pom依赖
<dependency> ???????????<groupId>io.netty</groupId> ???????????<artifactId>netty-all</artifactId> ???????????<version>5.0.0.Alpha2</version></dependency>
然后我们在配置文件中写入服务端的ip和端口号,用于连接
在springboot的application启动类中写入服务端的启动start方法,用于在启动项目时自动启动服务端
1 @SpringBootApplication 2 public class Application implements CommandLineRunner { 3 ?4 ????@Value("${netty.server.port}") 5 ????private ?int port; 6 ?7 ????@Value("${netty.server.host}") 8 ????private ?String host; 9 10 ????@Autowired11 ????NettyServer server;12 13 ????public static void main(String[] args) {14 ????????SpringApplication.run(Application.class, args);15 ????}16 17 18 ????@Override19 ????public void run(String... strings) throws Exception {20 ????????this.startServer();21 22 ????}23 24 ????//启动service25 ????public void startServer(){
//这个类实现一个IP套接字地址(IP地址+端口号) 26 ????????InetSocketAddress address = new InetSocketAddress(host, port);27 ????????ChannelFuture future = server.start(address);28 29 ????????Runtime.getRuntime().addShutdownHook(new Thread(){30 ????????????@Override31 ????????????public void run() {32 ????????????????server.destroy();33 ????????????}34 ????????});35 36 ????????future.channel().closeFuture().syncUninterruptibly();37 ????}38 ????}39 40 41 }
ChannelFuture:
Future最早出现于JDK的java.util.concurrent.Future,它用于表示异步操作的结果.由于Netty的Future都是与异步I/O操作相关的,因此命名为ChannelFuture,代表它与Channel操作相关.由于Netty中的所有I / O操作都是异步的,因此Netty为了解决调用者如何获取异步操作结果的问题而专门设计了ChannelFuture接口.
因此,Channel与ChannelFuture可以说形影不离的.
然后我们要去重点看server.start()
public class NettyServer { ???private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class); ???private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); ???private final EventLoopGroup bossGroup = new NioEventLoopGroup(); ???private final EventLoopGroup workGroup = new NioEventLoopGroup(); ???private Channel channel; ???/** ????* 开启及服务线程 ????*/ ???public ChannelFuture start(InetSocketAddress address) { ???????//服务端引导类 ???????ServerBootstrap bootstrap = new ServerBootstrap(); ???????bootstrap.group(bossGroup, workGroup)//通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池" ???????????????.channel(NioServerSocketChannel.class)//指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel ???????????????.childHandler(new NettyServerInitializer())//设置ServerSocketChannel的处理器 ???????????????.option(ChannelOption.SO_BACKLOG, 100)// 设置tcp协议的请求等待队列 ???????????????.childOption(ChannelOption.SO_KEEPALIVE, true);//配置子通道也就是SocketChannel的选项 ???????ChannelFuture future = bootstrap.bind(address).syncUninterruptibly(); ???????logger.info("准备接收——————"); ???????channel = future.channel(); ???????return future; ???} ???public void destroy() { ???????if(channel != null) { ???????????channel.close(); ???????} ???????channelGroup.close(); ???????workGroup.shutdownGracefully(); ???????bossGroup.shutdownGracefully(); ???}}
在这里的设置中,.childHandler(new NettyServerInitializer()) 用于设置了服务器管道 NioServerSocketChannel 的处理器handler,
这个handler是我们自定义封装的一些对channel的public class NettyServerInitializer extends ChannelInitializer<Channel>{
???
@Component
public class TcpMsgHandler extends ChannelInboundHandlerAdapter {
@Override ???protected void initChannel(Channel ch) throws Exception { ???????ChannelPipeline pipeline = ch.pipeline(); ???????//处理日志 ???????//pipeline.addLast(new LoggingHandler(LogLevel.INFO)); ???????//处理心跳 ???????pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
//消息编码 ???????pipeline.addLast(new MessageEncoder());
//粘包长度控制 ???????pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4));
//消息解码 ???????pipeline.addLast(new MessageDecoder());
//自定义hander ???????pipeline.addLast(new TcpMsgHandler()); ???}}
ChannelPipeline :
Netty 的 Channel 过滤器实现原理与 Servlet Filter 机制一致,它将 Channel 的数据管道抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O 事件拦截器 ChannelHandler 的链表,由 ChannelHandler 来对 I/O 事件进行具体的拦截和处理,可以方便地通过新增和删除 ChannelHandler 来实现不同业务逻辑的定制,能够实现对修改封闭和对扩展到支持。
我们看到我们添加了idleStateHandler用来处理心跳,那么心跳究竟是什么呢,我们先来介绍一下心跳
心跳机制
- 心跳是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制
- 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性
在我们的服务端中,不会主动发心跳给客户端,只会对对应的心跳消息,进行回应,告诉那些给我发心跳的客户端说:我还活着!
服务端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
设定IdleStateHandler心跳检测每五秒进行一次读检测,如果五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
TcpMsgHandler.java
@Componentpublic class TcpMsgHandler extends ChannelInboundHandlerAdapter { ???private final static Logger logger = LoggerFactory.getLogger(""); ???@Override ???public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ???????if (evt instanceof IdleStateEvent) {} ???????????IdleState state = ((IdleStateEvent) evt).state(); ???????????if (state == IdleState.READER_IDLE) { ???????????????ctx.close(); ???????????} ???????} else { ???????????super.userEventTriggered(ctx, evt); ???????} ???} ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { ???????TcpMsg msg = (TcpMsg) obj; ???????try { ?????????????//处理心跳 ?????????????... ?????????????ctx.writeAndFlush(msg); ???????????} ???????}catch(Exception ex){ ???????????logger.info(ex.getMessage()); ???????} ???}} ???
在这里,我们的channelRead比较简单,只是将客户端发来的心跳直接发回去了,实现了响应客户端心跳请求的目的,除了心跳,我们还可以去定义不同的消息类别,比如说是加密请求,还是处理数据的请求,入库的请求等等,
我们可以自己从channel中获取到客户端发过来的信息做处理,记得要即使响应,比如,心跳中,我们将msg又返回给了channel:
ctx.writeAndFlush(msg);
在handler中,decoder用于解码的作用,将客户端发来的ByteBuf流的形式,转为我们需要的格式,可以转为我们要的对象,或者是一个string字符串
MessageDecoder.java
public class MessageDecoder extends ByteToMessageDecoder { ???private Logger logger = LoggerFactory.getLogger(""); ???@Override ???protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ???????int len = in.readableBytes(); ???????????byte[] bytes = new byte[len];
//将ByteBuf转为byte数组 ???????????in.readBytes(bytes); ???????????try { ???????????????TcpMsg msg = TcpMsg.ByteToObj(bytes); ???????????????out.add(msg); ???????????} catch (Exception ex) { ???????????????logger.error("MessageDecoder",ex); ???????????} ???}}
encoder负责在我们发送数据的时候,将我们的对象、或者是字符串转为byte数组,然后输出
public class MessageEncoder extends MessageToByteEncoder<TcpMsg>{private Logger logger = LoggerFactory.getLogger("");@Overrideprotected void encode(ChannelHandlerContext ctx, TcpMsg msg, ByteBuf out) throws Exception {try{if (msg.getType() != 0){//logger.info("send: " + msg.getType() + ":" + msg.getGuid() + ":" + msg.getBody());}byte[] src = msg.ToBytes();out.writeBytes(src);}catch (Exception e){logger.error("MessageEncoder",e);}}}
3. 客户端代码:
在application配置文件中加入服务端的主机名和端口号
netty.server.host = 127.0.0.1netty.server.port = 9090
启动类Application:
@SpringBootApplicationpublic class Application{ @Autowiredprivate NettyClient client;@Value("${netty.server.port}")private int port;@Value("${netty.server.host}")private String host;public static void main(String[] args) throws Exception {SpringApplication.run(NettyClientApplication.class, args);}@Beanpublic NettyClient nettyClient() {return new NettyClient();}@Overridepublic void run(String... arg0) throws Exception {client.start(host, port);}}
NettyClient.java: 客户端启动类
@Componentpublic class NettyClient { ???//日志输出 ???private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); ???//主要连接地址 ???private static String nettyHost = ""; ???//备用连接地址 ???private static String nettyHostRe = ""; ???private static Integer nettyPort = 0; ???public boolean start(String host1,String host2,int port){ ???????//主要连接地址 ???????nettyHost = host1; ???????//备用连接地址 ???????nettyHostRe = host2; ???????nettyPort = port; ???????//EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据 ???????EventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); ???????//NioEventLoop ???????//客户端引导类 ???????Bootstrap bootstrap = new Bootstrap(); ???????//多线程处理 ???????bootstrap.group(nioEventLoopGroup); ???????//指定通道类型为NioServerSocketChannel,一种异步模式 ???????bootstrap.channel(NioSocketChannel.class); ???????//指定请求地址 ???????bootstrap.remoteAddress(new InetSocketAddress(nettyHost,port)); ???????bootstrap.option(ChannelOption.TCP_NODELAY,true); ???????final ConnectionWatchdog watchDog = new ConnectionWatchdog(bootstrap, new HashedWheelTimer(), nettyHost,nettyHostRe, port) { ???????????@Override ???????????public ChannelHandler[] handlers() { ???????????????return new ChannelHandler[]{ ???????????????????new MessageEncoder(), ???????????????????new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4), ???????????????????new MessageDecoder(), ???????????????????this, ???????????????????// 每隔5s的时间触发一次userEventTriggered的方法,并且指定IdleState的状态位是WRITER_IDLE ???????????????????new IdleStateHandler(0, 1, 0, TimeUnit.SECONDS), ???????????????????// 实现userEventTriggered方法,并在state是WRITER_IDLE的时候发送一个心跳包到sever端,告诉server端我还活着 ???????????????????new ClientHeartBeatHandler(), ???????????????}; ???????????} ???????}; ???????final ChannelFuture future; ???????try { ???????????synchronized (bootstrap) { ???????????????bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { ???????????????????@Override ???????????????????protected void initChannel(NioSocketChannel ch) throws Exception { ???????????????????????ch.pipeline().addLast(watchDog.handlers()); ???????????????????} ???????????????}); ???????????????future = bootstrap.connect().sync();// 链接服务器.调用sync()方法会同步阻塞 ???????????????//服务端连接ip: ???????????????logger.info("目前服务端连接ip为" + nettyHost); ???????????} ???????????if (!future.isSuccess()) { ???????????????logger.info("---- 连接服务器失败,2秒后重试 ---------port=" + port); ???????????????future.channel().eventLoop().schedule(new Runnable() { ???????????????????@Override ???????????????????public void run() { ???????????????????????start(nettyHost,nettyHostRe,nettyPort); ???????????????????} ???????????????}, 2L, TimeUnit.SECONDS); ???????????} ???????} catch (Exception e) { ???????????logger.info("exception happends e {}", e); ???????????return false; ???????} ???????return true; ???}}
ConnectionWatchdog.java :重连检测狗,当发现当前的链路不稳定关闭之后,进行重连
@ChannelHandler.Sharablepublic abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask,ChannelHandlerHolder{ ???//日志输出 ???private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class); ???//客户端引导类 ???private Bootstrap bootstrap; ???private Timer timer; ???private final String host; ???//备用服务端ip ???private final String host2; ???//使用ip ???private String useHost; ???private final int port; ???private volatile boolean reconnect = true; ???private int attempts; ???//刷新时间 ???private volatile long refreshTime = 0L; ???//心跳连接标识 ???private volatile boolean heartBeatCheck = false; ???//通道 ???private volatile Channel channel; ???//失败次数 ???private static int failCount; ???public ConnectionWatchdog(Bootstrap boot, Timer timer, String host,String host2, int port) { ???????this.bootstrap = boot; ???????this.timer = timer; ???????this.host = host; ???????this.host2 = host2; ???????this.port = port; ???} ???public boolean isReconnect() { ???????return reconnect; ???} ???public void setReconnect(boolean reconnect) { ???????this.reconnect = reconnect; ???}
//连接成功时调用的方法 ???@Override ???public void channelActive(final ChannelHandlerContext ctx) throws Exception { ???????channel = ctx.channel(); ???????attempts = 0; ???????reconnect =false; ???????refreshTime = new Date().getTime(); ???????if (!heartBeatCheck) { ???????????heartBeatCheck = true; ???????????channel.eventLoop().scheduleAtFixedRate(new Runnable() { ???????????????@Override ???????????????public void run() { ???????????????????long time = new Date().getTime() - refreshTime; ???????????????????logger.info(String.valueOf(time)); ???????????????????if (time > 5 * 1000L) { ???????????????????????channel.close(); ???????????????????????logger.info("心跳检查失败"); ???????????????????} else { ???????????????????????logger.info("心跳检查Successs"); ???????????????????} ???????????????} ???????????}, 5L, 5L, TimeUnit.SECONDS); ???????} ???????logger.info("Connects with {}.", channel); ???????ctx.fireChannelActive(); ???} ???/** ????* 因为链路断掉之后,会触发channelInActive方法,进行重连 2秒重连一次 ????*/ ???@Override ???public void channelInactive(ChannelHandlerContext ctx) throws Exception { ???????reconnect = true; ???????logger.warn("Disconnects with {}, doReconnect = {},attempts == {}", ctx.channel(), reconnect, attempts); ???????if (reconnect) { ???????????/*if (attempts < 12) { ???????????????attempts++; ???????????} else { ???????????????reconnect = false; ???????????}*/ ???????????long timeout = 2; ???????????logger.info("再过 {} 秒客户端将进行重连",timeout); ???????????timer.newTimeout(this, timeout, TimeUnit.SECONDS); ???????} ???} ???/* ???* run启动方法 ???* */ ???public void run(Timeout timeout) throws Exception { ???????//Future表示异步操作的结果 ???????final ChannelFuture future; ???????if(failCount > 2){ ???????????//使用备用ip ???????????if(host.equals(useHost)){ ???????????????useHost = host2; ???????????}else{ ???????????????useHost = host; ???????????} ???????}else { ???????????if(StrUtil.IsNullOrEmpty(useHost)) { ???????????????//首次重连 ???????????????useHost = host; ???????????} ???????} ???????synchronized (bootstrap) { ???????????future = bootstrap.connect(useHost, port); ???????} ???????//使用future监听结果,执行异步操作结束后的回调. ???????future.addListener(new ChannelFutureListener() { ???????????@Override ???????????public void operationComplete(final ChannelFuture f) throws Exception { ???????????????boolean succeed = f.isSuccess(); ???????????????logger.warn("连接通过 {}, {}.", useHost + ":" + port, succeed ? "成功" : "失败"); ???????????????if (!succeed) { ???????????????????logger.info("重连失败"); ???????????????????failCount ++; ???????????????????f.channel().pipeline().fireChannelInactive(); ???????????????}else{ ???????????????????failCount = 0; ???????????????????logger.info("重连成功"); ???????????????} ???????????} ???????}); ???} ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????if (msg instanceof TcpMsg) { ???????????TcpMsg heartMsg = (TcpMsg) msg; ???????????if (heartMsg.getType()>=0) { ???????????????refreshTime = new Date().getTime(); ???????????} ???????????logger.warn("得到服务器响应,响应内容为"+ ((TcpMsg) msg).getBody()); ???????} ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????super.exceptionCaught(ctx, cause); ???????Channel channel = ctx.channel(); ???????logger.info("客户端:"+channel.remoteAddress()+"网络异常"); ???????cause.printStackTrace(); ???????if(channel.isActive())ctx.close(); ???}}
这里我们定义了一个变量:refreshTime,当我们从channel中read到了服务端发来的心跳响应消息的话,就刷新refreshTime为当前时间
当连接成功时,会触发channelActive 方法,在这里我们开启了一个定时任务去判断refreshTime和当前时间的时间差,超过5秒说明断线了,要进行重连,我这里由于配置了两个服务器,所有在我的逻辑中,尝试连接2次以上连不上就去连另一个服务器去了
下面的handler用于发送心跳消息,实现userEventTriggered方法,并在state是WRITER_IDLE的时候发送一个心跳包到sever端,告诉server端我还活着
@Componentpublic class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(ClientHeartBeatHandler.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {clientname = ReadFileUtil.readFile("C:/CrawlProgram/wrapper_nettyClient/name.txt");if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.WRITER_IDLE) {//用于心跳的客户端类型为0int type = 0;//客户端机器名String body = clientname;TcpMsg msg = new TcpMsg(type,body);try {ctx.writeAndFlush(msg).sync();logger.info("发送消息成功,消息类型为:"+type+",请求id为" + msg.getGuid() + ",客户端机器号为:" + msg.getBody());} catch (Exception ex) {ex.printStackTrace();logger.info("发送失败");}}} else {super.userEventTriggered(ctx, evt);}}}
然后就是和服务端一样的decoder、encoder过程,不同的是,我们在decoder的时候使用了线程池去将任务放入队列中去,防止请求慢的时候丢失任务请求
MessageDecoder.java
public class MessageDecoder extends ByteToMessageDecoder { ???private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class); ???@Autowired ???private VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor; ???//线程池常量 ???public static VisiableThreadPoolTaskExecutor executor; ???private TcpMsg tcpMsg; ???List<Object> out; ???// 用@PostConstruct方法引导绑定 ???@PostConstruct ???public void init() { ???????executor = visiableThreadPoolTaskExecutor; ???????encryptService = ?encrypt; ???????orderService = order; ???} ???@Override ???public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ???????this.context = ctx; ???????this.out = out; ???????int len = in.readableBytes(); ???????if (len > 0) { ???????????logger.info("得到返回数据,长度为" + len); ???????????byte[] bytes = new byte[len]; ???????????in.readBytes(bytes); ???????????TcpMsg msg = TcpMsg.ByteToObj(bytes); ???????????this.tcpMsg = msg; ???????????logger.info("start asyncServiceExecutor"); ???????????executor.execute(new Runnable() { ???????????????@Override ???????????????public void run() { ???????????????????executeTask(); ???????????????} ???????????}); ???????????logger.info("end executeAsync"); ???????} ???}}
这里,我们使用了netty来实现了服务端、客户端通信、心跳检测的功能。体会到了netty的传输效率高、封装好的特性,用起来简单、实用。我们不仅可以做断线重连、还可以做很多业务请求,可以配置多台客户端去做不同的事情,来达到服务器调度的目的。
归根结底,netty还是一个框架的东西,我们还是没有过多的去看透nio的本质、我们要做的不仅仅是会用netty,而且还要了解nio、了解netty的实现原理,它的底层是如何封装的,希望大家多去研究,我们一起去搞懂它
Netty 的 Channel 过滤器实现原理与 Servlet Filter 机制一致,它将 Channel 的数据管道抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O 事件拦截器 ChannelHandler 的链表,由 ChannelHandler 来对 I/O 事件进行具体的拦截和处理,可以方便地通过新增和删除 ChannelHandler 来实现不同业务逻辑的定制,能够实现对修改封闭和对扩展到支持。
netty实现客户端服务端心跳重连
原文地址:https://www.cnblogs.com/GodHeng/p/9480726.html