分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 前端开发

Netty 实现心跳机制.md

发布时间:2023-09-06 02:22责任编辑:胡小海关键词:暂无标签

netty 心跳机制示例,使用netty4,IdleStateHandler 实现。

本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用。我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的。Netty中自带了一个IdleStateHandler 可以用来实现心跳检测。

心跳检测的逻辑

本文中我们将要实现的心跳检测逻辑是这样的:服务端启动后,等待客户端连接,客户端连接之后,向服务端发送消息。如果客户端在“干活”那么服务端必定会收到数据,如果客户端“闲下来了”那么服务端就接收不到这个服务端的消息,既然客户端闲下来了,不干事,那么何必浪费连接资源呢?所以服务端检测到一定时间内客户端不活跃的时候,将客户端连接关闭。本文要实现的逻辑步骤为:

  1. 启动服务端,启动客户端
  2. 客户端向服务端发送"I am alive",并sleep随机时间,用来模拟空闲。
  3. 服务端接收客户端消息,并返回"copy that",客户端空闲时 计数+1.
  4. 服务端客户端继续通信
  5. 服务端检测客户端空闲太多,关闭连接。客户端发现连接关闭了,就退出了。

有了这个思路,我们先来编写服务端。

心跳检测服务端代码

public class HeartBeatServer { ???int port ; ???public HeartBeatServer(int port){ ???????this.port = port; ???} ???public void start(){ ???????ServerBootstrap bootstrap = new ServerBootstrap(); ???????EventLoopGroup boss = new NioEventLoopGroup(); ???????EventLoopGroup worker = new NioEventLoopGroup(); ???????try{ ???????????bootstrap.group(boss,worker) ???????????????????.handler(new LoggingHandler(LogLevel.INFO)) ???????????????????.channel(NioServerSocketChannel.class) ???????????????????.childHandler(new HeartBeatInitializer()); ???????????ChannelFuture future = bootstrap.bind(port).sync(); ???????????future.channel().closeFuture().sync(); ???????}catch(Exception e){ ???????????e.printStackTrace(); ???????}finally { ???????????worker.shutdownGracefully(); ???????????boss.shutdownGracefully(); ???????} ???} ???public static void main(String[] args) throws Exception { ???????HeartBeatServer server = new HeartBeatServer(8090); ???????server.start(); ???}}

熟悉netty的同志,对于上面的模板一样的代码一定是在熟悉不过了。啥都不用看,只需要看childHandler(new HeartBeatInitializer()) 这一句。HeartBeatInitializer就是一个ChannelInitializer顾名思义,他就是在初始化channel的时做一些事情。我们所需要开发的业务逻辑Handler就是在这里添加的。其代码如下:

public class HeartBeatInitializer extends ChannelInitializer<Channel> { ???@Override ???protected void initChannel(Channel channel) throws Exception { ???????ChannelPipeline pipeline = channel.pipeline(); ???????pipeline.addLast("decoder", new StringDecoder()); ???????pipeline.addLast("encoder", new StringEncoder()); ???????pipeline.addLast(new IdleStateHandler(2,2,2, TimeUnit.SECONDS)); ???????pipeline.addLast(new HeartBeatHandler()); ???}}

代码很简单,我们先添加了StringDecoder,和StringEncoder。这两个其实就是编解码用的,下面的IdleStateHandler才是本次心跳的核心组件。我们可以看到IdleStateHandler的构造函数中接收了4个参数,起定义如下:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);

三个空闲时间参数,以及时间参数的格式。我们的例子中设置的是2,2,2,意思就是客户端2秒没有读/写,这个超时时间就会被触发。超时事件触发就需要我们来处理了,这就是上的HeartBeatInitializer中最后一行的HeartBeatHandler所做的事情。代码如下:

public class HeartBeatHandler extends SimpleChannelInboundHandler<String> { ???int readIdleTimes = 0; ???@Override ???protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { ???????System.out.println(" ====== > [server] message received : " + s); ??????if("I am alive".equals(s)){ ???????????ctx.channel().writeAndFlush("copy that"); ???????}else { ??????????System.out.println(" 其他信息处理 ... "); ??????} ???} ???@Override ???public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ???????IdleStateEvent event = (IdleStateEvent)evt; ???????String eventType = null; ???????switch (event.state()){ ???????????case READER_IDLE: ???????????????eventType = "读空闲"; ???????????????readIdleTimes ++; // 读空闲的计数加1 ???????????????break; ???????????case WRITER_IDLE: ???????????????eventType = "写空闲"; ???????????????// 不处理 ???????????????break; ???????????case ALL_IDLE: ???????????????eventType ="读写空闲"; ???????????????// 不处理 ???????????????break; ???????} ???????System.out.println(ctx.channel().remoteAddress() + "超时事件:" +eventType); ???????if(readIdleTimes > 3){ ???????????System.out.println(" [server]读空闲超过3次,关闭连接"); ???????????ctx.channel().writeAndFlush("you are out"); ???????????ctx.channel().close(); ???????} ???} ???@Override ???public void channelActive(ChannelHandlerContext ctx) throws Exception { ???????System.err.println("=== " + ctx.channel().remoteAddress() + " is active ==="); ???}}

至此,我们的服务端写好了。

心跳检测客户端代码

netty的api设计使得编码的模式非常具有通用性,所以客户端代码和服务端的代码几乎一样:启动client端的代码几乎一样,也需要一个ChannelInitializer,也需要Handler。改动的地方很少,因此本文不对客户端代码进行详细解释。下面给出client端的完整代码:

public class HeartBeatClient ?{ ???int port; ???Channel channel; ???Random random ; ???public HeartBeatClient(int port){ ???????this.port = port; ???????random = new Random(); ???} ???public static void main(String[] args) throws Exception{ ???????HeartBeatClient client = new HeartBeatClient(8090); ???????client.start(); ???} ???public void start() { ???????EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); ???????try{ ???????????Bootstrap bootstrap = new Bootstrap(); ???????????bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) ???????????????????.handler(new HeartBeatClientInitializer()); ???????????connect(bootstrap,port); ???????????String ?text = "I am alive"; ???????????while (channel.isActive()){ ???????????????sendMsg(text); ???????????} ???????}catch(Exception e){ ???????????// do something ???????}finally { ???????????eventLoopGroup.shutdownGracefully(); ???????} ???} ???public void connect(Bootstrap bootstrap,int port) throws Exception{ ???????channel = bootstrap.connect("localhost",8090).sync().channel(); ???} ???public void sendMsg(String text) throws Exception{ ???????int num = random.nextInt(10); ???????Thread.sleep(num * 1000); ???????channel.writeAndFlush(text); ???} ???static class HeartBeatClientInitializer extends ChannelInitializer<Channel> { ???????@Override ???????protected void initChannel(Channel ch) throws Exception { ???????????ChannelPipeline pipeline = ch.pipeline(); ???????????pipeline.addLast("decoder", new StringDecoder()); ???????????pipeline.addLast("encoder", new StringEncoder()); ???????????pipeline.addLast(new HeartBeatClientHandler()); ???????} ???} ???static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> { ???????@Override ???????protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { ???????????System.out.println(" client received :" +msg); ???????????if(msg!= null && msg.equals("you are out")) { ???????????????System.out.println(" server closed connection , so client will close too"); ???????????????ctx.channel().closeFuture(); ???????????} ???????} ???}}

运行代码

在上面的代码写好之后,我们先启动服务端,然后在启动客户端。运行日志如下:

server端:

=== /127.0.0.1:57700 is active === ====== > [server] message received : I am alive ====== > [server] message received : I am alive/127.0.0.1:57700超时事件:写空闲/127.0.0.1:57700超时事件:读空闲/127.0.0.1:57700超时事件:读写空闲/127.0.0.1:57700超时事件:写空闲/127.0.0.1:57700超时事件:读空闲/127.0.0.1:57700超时事件:读写空闲/127.0.0.1:57700超时事件:写空闲 ====== > [server] message received : I am alive/127.0.0.1:57700超时事件:写空闲/127.0.0.1:57700超时事件:读写空闲/127.0.0.1:57700超时事件:读空闲/127.0.0.1:57700超时事件:写空闲/127.0.0.1:57700超时事件:读写空闲/127.0.0.1:57700超时事件:读空闲 [server]读空闲超过3次,关闭连接

client端:

 client sent msg and sleep 2 client received :copy that client received :copy that client sent msg and sleep 6 client sent msg and sleep 6 client received :copy that client received :you are out server closed connection , so client will close tooProcess finished with exit code 0

通过上面的运行日志,我们可以看到:

1.客户端在与服务器成功建立之后,发送了3次‘I am alive‘,服务端也回应了3次:‘copy that‘

2.由于客户端消极怠工,超时了多次,服务端关闭了链接。

3.客户端知道服务端抛弃自己之后,也关闭了连接,程序退出。

以上简单了演示了一下,netty的心跳机制,其实主要就是使用了IdleStateHandler。源码下载:https://gitee.com/dimixu/netty_learn.git

Netty 实现心跳机制.md

原文地址:https://www.cnblogs.com/demingblog/p/9957143.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved