分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

基于netty实现的长连接,心跳机制及重连机制

发布时间:2023-09-06 02:34责任编辑:白小东关键词:暂无标签
技术:maven3.0.5 + netty4.1.33 + jdk1.8
 

概述

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。 “快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

详细

代码下载:http://www.demodashi.com/demo/15012.html

详细

本篇demo实现的功能是基于netty的心跳机制和长连接以及重连机制,最关键的就是通过netty中的 IdleStateHandler 的超时机制来实现心跳和重连 ,然后通过org.msgpack编码器来实现跨平台数据传输,

实现的功能就是通过Scanner来输入消息得到服务端的回应,超过设定的超时时间就触发超时事件来进行心跳传输,如果服务端宕机客户端就会一直发起重连。

一、运行效果

服务端:

客户端:

二、实现过程

  1. 在maven pom文件添加依赖:

  2.  ??????<!-- 解码and编码器 --> ???????<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack --> ???????<dependency> ???????????<groupId>org.msgpack</groupId> ???????????<artifactId>msgpack</artifactId> ???????????<version>0.6.12</version> ???????</dependency> ???????<!-- netty 核心依赖 --> ???????<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> ?????<dependency> ???<groupId>io.netty</groupId> ???<artifactId>netty-all</artifactId> ???<version>4.1.33.Final</version> ?</dependency>
  3. 导入以上依赖 ↓ 创建配置模型model(模型类) , TypeData(参数配置类) ↓ 创建解码and编码器MsgPckDecode(解码器) ,MsgPckEncode(编码器) ↓ 创建各自的控制器 AbstractClientChannelInboundHandleAdapter,AbstractServerChannelInboundHandleAdapter↓ 创建客户端及客户端控制器Client(客户端启动类) , ClientHandler(客户端控制器) ↓ 创建服务端以及控制器Server(客户端启动类) , ServerHandler(客户端控制器)ps:本demo使用了msgpack , It’s like JSON. but fast and small.
  4. package com.zxh.demo.model;import java.io.Serializable;import org.msgpack.annotation.Message;/** * 消息类型分离器 * @author Administrator * */@Messagepublic class Model implements Serializable{ ???private static final long serialVersionUID = 1L; ???//类型 ???private int type; ???//内容 ???private String body; ???public String getBody() { ???????return body; ???} ???public void setBody(String body) { ???????this.body = body; ???} ???public int getType() { ???????return type; ???} ???public void setType(int type) { ???????this.type = type; ???} ???@Override ???public String toString() { ???????return "Model [type=" + type + ", body=" + body + "]"; ???}}
  5. 编写一个配置类接口,用于控制心跳包和应用消息的处理
  6. package com.zxh.demo.model;/** * 配置项 * @author Administrator * */public interface TypeData { ???byte PING = 1; ???byte PONG = 2; ?????//顾客 ???byte CUSTOMER = 3;}

    创建MsgPckDecode(解码器)

  7. package com.zxh.demo.model;import java.util.List;import org.msgpack.MessagePack;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;/** * 解码器 * @author Administrator * */public class MsgPckDecode extends MessageToMessageDecoder<ByteBuf>{ ???@Override ???protected void decode(ChannelHandlerContext ctx, ByteBuf msg, ???????????List<Object> out) throws Exception { ???????final ?byte[] array; ???????final int length = msg.readableBytes(); ???????array = new byte[length]; ???????msg.getBytes(msg.readerIndex(), array, 0, length); ???????MessagePack pack = new MessagePack(); ???????out.add(pack.read(array, Model.class)); ???}}
  8. 创建MsgPckEncode(编码器)
  9. package com.zxh.demo.model;import org.msgpack.MessagePack;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;/** * 编码器 * @author Administrator * */public class MsgPckEncode extends MessageToByteEncoder<Object>{ ???@Override ???protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) ???????????throws Exception { ???????// TODO Auto-generated method stub ???????MessagePack pack = new MessagePack(); ???????byte[] write = pack.write(msg); ???????buf.writeBytes(write); ???}}
  10. 创建client客户端:
  11. package com.zxh.demo.client;import java.util.Scanner;import java.util.concurrent.TimeUnit;import com.zxh.demo.model.Model;import com.zxh.demo.model.MsgPckDecode;import com.zxh.demo.model.MsgPckEncode;import com.zxh.demo.model.TypeData;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.timeout.IdleStateHandler;public class Client { ???private NioEventLoopGroup worker = new NioEventLoopGroup(); ???private Channel channel; ???private Bootstrap bootstrap; ???public static void main(String[] args) { ???????Client ?client = new Client(); ???????client.start(); ???????client.sendData(); ?????????} ???private void start() { ???????bootstrap = new Bootstrap(); ???????????????bootstrap.group(worker) ???????.channel(NioSocketChannel.class) ???????.option(ChannelOption.TCP_NODELAY, true) ???????.handler(new ChannelInitializer<Channel>() { ???????????@Override ???????????protected void initChannel(Channel ch) throws Exception { ???????????????// TODO Auto-generated method stub ???????????????ChannelPipeline pipeline = ch.pipeline(); ???????????????pipeline.addLast(new IdleStateHandler(0,0,5)); ???????????????pipeline.addLast(new MsgPckDecode()); ???????????????pipeline.addLast(new MsgPckEncode()); ???????????????pipeline.addLast(new ClientHandler(Client.this)); ?????????????????????????} ??????????????????}); ????????doConnect(); ???} ???/** ????* 连接服务端 and 重连 ????*/ ???protected void doConnect() { ???????if (channel != null && channel.isActive()){ ???????????return; ???????} ??????????????ChannelFuture connect = bootstrap.connect("127.0.0.1", 8081); ???????//实现监听通道连接的方法 ???????connect.addListener(new ChannelFutureListener() { ???????????@Override ???????????public void operationComplete(ChannelFuture channelFuture) throws Exception { ???????????????if(channelFuture.isSuccess()){ ???????????????????channel = channelFuture.channel(); ???????????????????System.out.println("连接服务端成功"); ???????????????}else{ ???????????????????System.out.println("每隔2s重连...."); ???????????????????channelFuture.channel().eventLoop().schedule(new Runnable() { ???????????????????????@Override ???????????????????????public void run() { ???????????????????????????doConnect(); ???????????????????????} ???????????????????},2,TimeUnit.SECONDS); ???????????????} ??????????????} ???????}); ????????} ??????/** ????* 向服务端发送消息 ????*/ ???private void sendData() { ???????Scanner sc= new Scanner(System.in); ????????for (int i = 0; i < 1000; i++) { ???????????if(channel != null && channel.isActive()){ ?????????????????????????????//获取一个键盘扫描器 ???????????????String nextLine = sc.nextLine(); ???????????????Model model = new Model(); ???????????????model.setType(TypeData.CUSTOMER); ???????????????model.setBody(nextLine); ???????????????channel.writeAndFlush(model); ???????????} ???????} ???}}
  12. 创建Server服务端:
  13. package com.zxh.demo.server;import com.zxh.demo.model.MsgPckDecode;import com.zxh.demo.model.MsgPckEncode;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.timeout.IdleStateHandler;public class Server { ???public static void main(String[] args) { ???????EventLoopGroup bossGroup = new NioEventLoopGroup(1); ???????EventLoopGroup workerGroup = new NioEventLoopGroup(4); ???????try { ???????????ServerBootstrap serverBootstrap = new ServerBootstrap(); ???????????serverBootstrap.group(bossGroup, workerGroup) ???????????.channel(NioServerSocketChannel.class) ???????????.localAddress(8081) ???????????.childHandler(new ChannelInitializer<Channel>() { ???????????????@Override ???????????????protected void initChannel(Channel ch) throws Exception { ???????????????????// TODO Auto-generated method stub ???????????????????ChannelPipeline pipeline = ch.pipeline(); ???????????????????pipeline.addLast(new IdleStateHandler(10,0,0)); ???????????????????pipeline.addLast(new MsgPckDecode()); ???????????????????pipeline.addLast(new MsgPckEncode()); ???????????????????pipeline.addLast(new ServerHandler()); ????????????????} ???????????}); ????????????????????System.out.println("start server by port 8081 --"); ???????????ChannelFuture sync = serverBootstrap.bind().sync(); ???????????sync.channel().closeFuture().sync(); ???????} catch (InterruptedException e) { ???????????// TODO Auto-generated catch block ???????????e.printStackTrace(); ???????}finally{ ???????????//优雅的关闭资源 ???????????bossGroup.shutdownGracefully(); ???????????workerGroup.shutdownGracefully(); ???????} ???}}

先运行服务端,然后再启动客户端 会根据设置的端口连接服务端,在客户端输入消息就会得到服务端的回应,如果超过5秒没有进行读写就会触发IdleStateHandler类超时事件 来进行心跳包的传输 ,服务端未检测到客户端的读写或者心跳就会主动关闭channel通道

三、项目结构图

四、补充

所谓的心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性

代码下载:http://www.demodashi.com/demo/15012.html

注:本文著作权归作者,由demo大师发表,拒绝转载,转载需要作者授权

基于netty实现的长连接,心跳机制及重连机制

原文地址:https://www.cnblogs.com/demodashi/p/10503459.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved