分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 前端开发

<Netty>(十八)(中级篇)心跳连接

发布时间:2023-09-06 01:45责任编辑:胡小海关键词:暂无标签

一,客户端代码

 1 package bhz.netty.heartBeat; 2 ?3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.SocketChannel; 9 import io.netty.channel.socket.nio.NioSocketChannel;10 import io.netty.handler.timeout.ReadTimeoutHandler;11 12 public class Client {13 14 ????15 ????public static void main(String[] args) throws Exception{16 ????????17 ????????EventLoopGroup group = new NioEventLoopGroup();18 ????????Bootstrap b = new Bootstrap();19 ????????b.group(group)20 ?????????.channel(NioSocketChannel.class)21 ?????????.handler(new ChannelInitializer<SocketChannel>() {22 ????????????@Override23 ????????????protected void initChannel(SocketChannel sc) throws Exception {24 ????????????????sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());25 ????????????????sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());26 ????????????????sc.pipeline().addLast(new ReadTimeoutHandler(30));27 ????????????????sc.pipeline().addLast(new ClienHeartBeattHandler());28 ????????????}29 ????????});30 ????????31 ????????ChannelFuture cf = b.connect("10.13.82.18", 8888).sync();32 33 ????????cf.channel().closeFuture().sync();34 ????????group.shutdownGracefully();35 ????}36 }

二,客户端助手类代码

 ?1 package bhz.netty.heartBeat; ?2 ??3 import java.net.InetAddress; ?4 import java.util.HashMap; ?5 import java.util.concurrent.Executors; ?6 import java.util.concurrent.ScheduledExecutorService; ?7 import java.util.concurrent.ScheduledFuture; ?8 import java.util.concurrent.TimeUnit; ?9 ?10 import org.hyperic.sigar.CpuPerc; 11 import org.hyperic.sigar.Mem; 12 import org.hyperic.sigar.Sigar; 13 ?14 import io.netty.channel.ChannelHandlerContext; 15 import io.netty.channel.ChannelInboundHandlerAdapter; 16 import io.netty.util.ReferenceCountUtil; 17 ?18 ?19 public class ClienHeartBeattHandler extends ChannelInboundHandlerAdapter { 20 ?21 ????private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); 22 ?????23 ????private ScheduledFuture<?> heartBeat; 24 ????//主动向服务器发送认证信息 25 ????private InetAddress addr ; 26 ?????27 ????private static final String SUCCESS_KEY = "auth_success_key"; 28 ?29 ????@Override 30 ????public void channelActive(ChannelHandlerContext ctx) throws Exception { 31 ????????addr = InetAddress.getLocalHost(); 32 ????????String ip = addr.getHostAddress(); 33 ????????System.err.println("ip:" + ip); 34 ????????String key = "1234"; 35 ????????String auth = ip + "," + key; 36 ????????ctx.writeAndFlush(auth); 37 ????} 38 ?????39 ????@Override 40 ????public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 41 ????????try { 42 ????????????if(msg instanceof String){ 43 ????????????????String ret = (String)msg; 44 ????????????????if(SUCCESS_KEY.equals(ret)){ 45 ????????????????????// 握手成功,主动发送心跳消息 46 ????????????????????this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 8, TimeUnit.SECONDS); 47 ????????????????????System.out.println(msg); ????????????????48 ????????????????} 49 ????????????????else { 50 ????????????????????System.out.println(msg); 51 ????????????????} 52 ????????????} 53 ????????} finally { 54 ????????????ReferenceCountUtil.release(msg); 55 ????????} 56 ????} 57 ?58 ????private class HeartBeatTask implements Runnable { 59 ????????private final ChannelHandlerContext ctx; 60 ?61 ????????public HeartBeatTask(final ChannelHandlerContext ctx) { 62 ????????????this.ctx = ctx; 63 ????????} 64 ?????65 ????????@Override 66 ????????public void run() { 67 ????????????try { 68 ????????????????RequestInfo info = new RequestInfo(); 69 ????????????????//ip 70 ????????????????info.setIp(addr.getHostAddress()); 71 ????????????????Sigar sigar = new Sigar(); 72 ????????????????//cpu prec 73 ????????????????CpuPerc cpuPerc = sigar.getCpuPerc(); 74 ????????????????HashMap<String, Object> cpuPercMap = new HashMap<String, Object>(); 75 ????????????????cpuPercMap.put("combined", cpuPerc.getCombined()); 76 ????????????????cpuPercMap.put("user", cpuPerc.getUser()); 77 ????????????????cpuPercMap.put("sys", cpuPerc.getSys()); 78 ????????????????cpuPercMap.put("wait", cpuPerc.getWait()); 79 ????????????????cpuPercMap.put("idle", cpuPerc.getIdle()); 80 ????????????????// memory 81 ????????????????Mem mem = sigar.getMem(); 82 ????????????????HashMap<String, Object> memoryMap = new HashMap<String, Object>(); 83 ????????????????memoryMap.put("total", mem.getTotal() / 1024L); 84 ????????????????memoryMap.put("used", mem.getUsed() / 1024L); 85 ????????????????memoryMap.put("free", mem.getFree() / 1024L); 86 ????????????????info.setCpuPercMap(cpuPercMap); 87 ????????????????info.setMemoryMap(memoryMap); 88 ????????????????ctx.writeAndFlush(info); 89 ?????????????????90 ????????????} catch (Exception e) { 91 ????????????????e.printStackTrace(); 92 ????????????} 93 ????????} 94 ?95 ????????public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 96 ????????????cause.printStackTrace(); 97 ????????????if (heartBeat != null) { 98 ????????????????heartBeat.cancel(true); 99 ????????????????heartBeat = null;100 ????????????}101 ????????????ctx.fireExceptionCaught(cause);102 ????????}103 ????????104 ????}105 }

 1,连接成功后会先执行客户端的public void channelActive(ChannelHandlerContext ctx) throws Exception {方法注意这个时候还没有去执行服务器端的代码。

所以先打印出来ip:10.13.82.18这行代码。

然后现在去执行服务器端的代码

3,第40行代码这时候会返回auth_success_key,

4,第46行代码会执行,由于是异步的所以会先走第47行代码,所以客户端会打印出来auth_success_key这个字段

6,这时候服务端返回的会执行第50行代码所以打印出来info received!

7,由于一直处于一个连接的状态而且使用的是心跳连接每过5秒就会连接一次,这时候由于是一直连接的状态,所以就会一直返回info received!

三,封装的实体类对象

 1 package bhz.netty.heartBeat; 2 ?3 import java.io.Serializable; 4 import java.util.HashMap; 5 ?6 public class RequestInfo implements Serializable { 7 ?8 ????private String ip ; 9 ????private HashMap<String, Object> cpuPercMap ;10 ????private HashMap<String, Object> memoryMap;11 ????//.. other field12 ????13 ????public String getIp() {14 ????????return ip;15 ????}16 ????public void setIp(String ip) {17 ????????this.ip = ip;18 ????}19 ????public HashMap<String, Object> getCpuPercMap() {20 ????????return cpuPercMap;21 ????}22 ????public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {23 ????????this.cpuPercMap = cpuPercMap;24 ????}25 ????public HashMap<String, Object> getMemoryMap() {26 ????????return memoryMap;27 ????}28 ????public void setMemoryMap(HashMap<String, Object> memoryMap) {29 ????????this.memoryMap = memoryMap;30 ????}31 ????32 ????33 }

四,编解码工具类

 1 package bhz.netty.heartBeat; 2 ?3 import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; 4 import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; 5 import io.netty.handler.codec.marshalling.MarshallerProvider; 6 import io.netty.handler.codec.marshalling.MarshallingDecoder; 7 import io.netty.handler.codec.marshalling.MarshallingEncoder; 8 import io.netty.handler.codec.marshalling.UnmarshallerProvider; 9 10 import org.jboss.marshalling.MarshallerFactory;11 import org.jboss.marshalling.Marshalling;12 import org.jboss.marshalling.MarshallingConfiguration;13 14 /**15 ?* Marshalling工厂16 ?* @author(alienware)17 ?* @since 2014-12-1618 ?*/19 public final class MarshallingCodeCFactory {20 21 ????/**22 ?????* 创建Jboss Marshalling解码器MarshallingDecoder23 ?????* @return MarshallingDecoder24 ?????*/25 ????public static MarshallingDecoder buildMarshallingDecoder() {26 ????????//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。27 ????????final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");28 ????????//创建了MarshallingConfiguration对象,配置了版本号为5 29 ????????final MarshallingConfiguration configuration = new MarshallingConfiguration();30 ????????configuration.setVersion(5);31 ????????//根据marshallerFactory和configuration创建provider32 ????????UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);33 ????????//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度34 ????????MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);35 ????????return decoder;36 ????}37 38 ????/**39 ?????* 创建Jboss Marshalling编码器MarshallingEncoder40 ?????* @return MarshallingEncoder41 ?????*/42 ????public static MarshallingEncoder buildMarshallingEncoder() {43 ????????final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");44 ????????final MarshallingConfiguration configuration = new MarshallingConfiguration();45 ????????configuration.setVersion(5);46 ????????MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);47 ????????//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组48 ????????MarshallingEncoder encoder = new MarshallingEncoder(provider);49 ????????return encoder;50 ????}51 }

五,服务端代码

 1 package bhz.netty.heartBeat; 2 ?3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioServerSocketChannel;11 import io.netty.handler.logging.LogLevel;12 import io.netty.handler.logging.LoggingHandler;13 import io.netty.handler.timeout.ReadTimeoutHandler;14 15 public class Server {16 17 ????public static void main(String[] args) throws Exception{18 ????????19 ????????EventLoopGroup pGroup = new NioEventLoopGroup();20 ????????EventLoopGroup cGroup = new NioEventLoopGroup();21 ????????22 ????????ServerBootstrap b = new ServerBootstrap();23 ????????b.group(pGroup, cGroup)24 ?????????.channel(NioServerSocketChannel.class)25 ?????????.option(ChannelOption.SO_BACKLOG, 1024)26 ?????????//设置日志27 ?????????.handler(new LoggingHandler(LogLevel.INFO))28 ?????????.childHandler(new ChannelInitializer<SocketChannel>() {29 ????????????protected void initChannel(SocketChannel sc) throws Exception {30 ????????????????sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());31 ????????????????sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());32 ????????????????sc.pipeline().addLast(new ReadTimeoutHandler(30));33 ????????????????sc.pipeline().addLast(new ServerHeartBeatHandler());34 ????????????}35 ????????});36 ????????37 ????????ChannelFuture cf = b.bind(8888).sync();38 ????????39 ????????cf.channel().closeFuture().sync();40 ????????pGroup.shutdownGracefully();41 ????????cGroup.shutdownGracefully();42 ????????43 ????}44 }

,

六,服务端助手类代码

 1 package bhz.netty.heartBeat; 2 ?3 import java.util.HashMap; 4 ?5 import io.netty.channel.ChannelFutureListener; 6 import io.netty.channel.ChannelHandlerContext; 7 import io.netty.channel.ChannelInboundHandlerAdapter; 8 ?9 public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {10 ????11 ????/** key:ip value:auth */12 ????private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();13 ????private static final String SUCCESS_KEY = "auth_success_key";14 ????15 ????static {16 ????????AUTH_IP_MAP.put("10.13.82.18", "1234");17 ????}18 ????19 ????private boolean auth(ChannelHandlerContext ctx, Object msg){20 ????????????//System.out.println(msg);21 ????????????String [] ret = ((String) msg).split(",");22 ????????????String auth = AUTH_IP_MAP.get(ret[0]);23 ????????????if(auth != null && auth.equals(ret[1])){24 ????????????????ctx.writeAndFlush(SUCCESS_KEY);25 ????????????????return true;26 ????????????} else {27 ????????????????ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);28 ????????????????return false;29 ????????????}30 ????}31 ????32 ????@Override33 ????public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {34 ????????if(msg instanceof String){35 ????????????auth(ctx, msg);36 ????????} else if (msg instanceof RequestInfo) {37 ????????????38 ????????????RequestInfo info = (RequestInfo) msg;39 ????????????System.out.println("--------------------------------------------");40 ????????????System.out.println("当前主机ip为: " + info.getIp());41 ????????????System.out.println("当前主机cpu情况: ");42 ????????????HashMap<String, Object> cpu = info.getCpuPercMap();43 ????????????System.out.println("总使用率: " + cpu.get("combined"));44 ????????????System.out.println("用户使用率: " + cpu.get("user"));45 ????????????System.out.println("系统使用率: " + cpu.get("sys"));46 ????????????System.out.println("等待率: " + cpu.get("wait"));47 ????????????System.out.println("空闲率: " + cpu.get("idle"));48 ????????????49 ????????????System.out.println("当前主机memory情况: ");50 ????????????HashMap<String, Object> memory = info.getMemoryMap();51 ????????????System.out.println("内存总量: " + memory.get("total"));52 ????????????System.out.println("当前内存使用量: " + memory.get("used"));53 ????????????System.out.println("当前内存剩余量: " + memory.get("free"));54 ????????????System.out.println("--------------------------------------------");55 ????????????56 ????????????ctx.writeAndFlush("info received!");57 ????????} else {58 ????????????ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);59 ????????}60 ????}61 62 63 }

 2,第35行代码执行,这时候会返回auth_success_key,所以客户端会打印出来auth_success_key这个字段

5,由46行代码触发的回调函数,会触发38-55行的代码,这时候会返回客户端

七,结果

客户端结果

1 ip:10.13.82.182 auth_success_key3 info received!4 info received!5 info received!6 info received!7 info received!8 info received!

服务端结果

 ?1 当前主机ip为: 10.13.82.18 ?2 当前主机cpu情况: ??3 总使用率: 0.15106815869786366 ?4 用户使用率: 0.1185147507629705 ?5 系统使用率: 0.03255340793489318 ?6 等待率: 0.0 ?7 空闲率: 0.8489318413021363 ?8 当前主机memory情况: ??9 内存总量: 3865836 10 当前内存使用量: 3537528 11 当前内存剩余量: 328308 12 -------------------------------------------- 13 -------------------------------------------- 14 当前主机ip为: 10.13.82.18 15 当前主机cpu情况: ?16 总使用率: 0.15106815869786366 17 用户使用率: 0.1190233977619532 18 系统使用率: 0.03204476093591048 19 等待率: 0.0 20 空闲率: 0.8413021363173957 21 当前主机memory情况: ?22 内存总量: 3865836 23 当前内存使用量: 3518000 24 当前内存剩余量: 347836 25 -------------------------------------------- 26 -------------------------------------------- 27 当前主机ip为: 10.13.82.18 28 当前主机cpu情况: ?29 总使用率: 0.16954164613109907 30 用户使用率: 0.16165598817151305 31 系统使用率: 0.007885657959586003 32 等待率: 0.0 33 空闲率: 0.830458353868901 34 当前主机memory情况: ?35 内存总量: 3865836 36 当前内存使用量: 3521124 37 当前内存剩余量: 344712 38 -------------------------------------------- 39 -------------------------------------------- 40 当前主机ip为: 10.13.82.18 41 当前主机cpu情况: ?42 总使用率: 0.163265306122449 43 用户使用率: 0.14095870906502136 44 系统使用率: 0.022306597057427623 45 等待率: 0.0 46 空闲率: 0.8367346938775511 47 当前主机memory情况: ?48 内存总量: 3865836 49 当前内存使用量: 3495956 50 当前内存剩余量: 369880 51 -------------------------------------------- 52 -------------------------------------------- 53 当前主机ip为: 10.13.82.18 54 当前主机cpu情况: ?55 总使用率: 0.16366366366366367 56 用户使用率: 0.14814814814814814 57 系统使用率: 0.015515515515515516 58 等待率: 0.0 59 空闲率: 0.8363363363363363 60 当前主机memory情况: ?61 内存总量: 3865836 62 当前内存使用量: 3493664 63 当前内存剩余量: 372172 64 -------------------------------------------- 65 -------------------------------------------- 66 当前主机ip为: 10.13.82.18 67 当前主机cpu情况: ?68 总使用率: 0.17497456765005087 69 用户使用率: 0.1515768056968464 70 系统使用率: 0.023397761953204477 71 等待率: 0.0 72 空闲率: 0.8250254323499492 73 当前主机memory情况: ?74 内存总量: 3865836 75 当前内存使用量: 3489476 76 当前内存剩余量: 376360 77 -------------------------------------------- 78 -------------------------------------------- 79 当前主机ip为: 10.13.82.18 80 当前主机cpu情况: ?81 总使用率: 0.14637752587481517 82 用户使用率: 0.10004928536224741 83 系统使用率: 0.046328240512567766 84 等待率: 0.0 85 空闲率: 0.8457368161655988 86 当前主机memory情况: ?87 内存总量: 3865836 88 当前内存使用量: 3472776 89 当前内存剩余量: 393060 90 -------------------------------------------- 91 -------------------------------------------- 92 当前主机ip为: 10.13.82.18 93 当前主机cpu情况: ?94 总使用率: 0.22200488997555012 95 用户使用率: 0.1374083129584352 96 系统使用率: 0.08459657701711491 97 等待率: 0.0 98 空闲率: 0.7779951100244499 99 当前主机memory情况: 100 内存总量: 3865836101 当前内存使用量: 3467608102 当前内存剩余量: 398228103 --------------------------------------------104 --------------------------------------------105 当前主机ip为: 10.13.82.18106 当前主机cpu情况: 107 总使用率: 0.1416267942583732108 用户使用率: 0.13444976076555024109 系统使用率: 0.007177033492822967110 等待率: 0.0111 空闲率: 0.8583732057416268112 当前主机memory情况: 113 内存总量: 3865836114 当前内存使用量: 3470100115 当前内存剩余量: 395736

 注意红字的顺序及解释

<Netty>(十八)(中级篇)心跳连接

原文地址:https://www.cnblogs.com/qingruihappy/p/8570524.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved