第一种,使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启的状态。如果服务器性能足够好,并且我们的客户端数量也比较少的情况下,是适合使用长连接的通道。
第二种,采用短连接方式,一次性批量提交数据,也就是我们会把数据保存在本地临时缓冲区或者临时表里。当达到数量时,就进行批量提交;或者通过定时任务轮询提交。这种情况是有弊端的,就是无法做到实时传输。如果应用程序对实时性要求不高,可以考虑使用。
第三种,采用一种特殊的长连接。特殊在哪里呢?在指定的某一时间之内,服务器与某台客户端没有任何通信,则断开连接,如果断开连接后,客户端又需要向服务器发送请求,那么再次建立连接。这里有点CachedThreadPool的味道。
一般情况下我们采用第三种方案
一,客户端类
1 package bhz.netty.runtime; 2 ?3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelInitializer; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel;10 import io.netty.channel.socket.nio.NioSocketChannel;11 import io.netty.handler.timeout.ReadTimeoutHandler;12 13 public class Client {14 15 ????public static void main(String[] args) throws Exception {16 ????????//ONE:17 ????????//1 线程工作组18 ????????EventLoopGroup work = new NioEventLoopGroup();19 ????????20 ????????//TWO:21 ????????//3 辅助类。用于帮助我们创建NETTY服务22 ????????Bootstrap b = new Bootstrap();23 ????????b.group(work) ???//绑定工作线程组24 ?????????.channel(NioSocketChannel.class) ???//设置NIO的模式25 ?????????// 初始化绑定服务通道26 ?????????.handler(new ChannelInitializer<SocketChannel>() {27 ????????????@Override28 ????????????protected void initChannel(SocketChannel sc) throws Exception {29 ????????????????// 为通道进行初始化: 数据传输过来的时候会进行拦截和执行30 ????????????????sc.pipeline().addLast(new ReadTimeoutHandler(5));31 ????????????????sc.pipeline().addLast(new ClientHandler());32 ????????????}33 ?????????});34 ????????35 ????????ChannelFuture cf = ?b.connect("127.0.0.1", 8765).syncUninterruptibly();36 ????????37 ????????cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-1".getBytes()));38 ????39 ????????Thread.sleep(1000);40 ????????41 ????????cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-2".getBytes()));42 ????????43 ????????Thread.sleep(1000);44 ????????cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-3".getBytes()));45 ????????46 ????????Thread.sleep(1000);47 ????????cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-4".getBytes()));48 ????????49 ????????Thread.sleep(1000);50 ????????cf.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty!-5".getBytes()));51 ????????52 ????????53 ????????//释放连接54 ????????cf.channel().closeFuture().sync();55 ????????work.shutdownGracefully();56 ????}57 }
二,客户端助手类
1 package bhz.netty.runtime; 2 ?3 import io.netty.buffer.ByteBuf; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6 import io.netty.util.ReferenceCountUtil; 7 ?8 public class ClientHandler ?extends ChannelInboundHandlerAdapter { 9 10 ????11 ????@Override12 ????public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {13 ????????try {14 ????????????ByteBuf buffer = (ByteBuf)msg;15 ?????????????byte[] data = new byte[buffer.readableBytes()];16 ?????????????buffer.readBytes(data);17 ?????????????String str = new String(data, "utf-8");18 ?????????????System.err.println("客户端:" + str);19 ????????} finally {20 ????????????ReferenceCountUtil.release(msg);21 ????????}22 ????}23 ????24 ????@Override25 ????public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)26 ????????????throws Exception {27 ????????ctx.close();28 ????}29 ????30 }
三,服务端类
1 package bhz.netty.runtime; 2 ?3 ?4 import io.netty.bootstrap.ServerBootstrap; 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelInitializer; 7 import io.netty.channel.ChannelOption; 8 import io.netty.channel.EventLoopGroup; 9 import io.netty.channel.nio.NioEventLoopGroup;10 import io.netty.channel.socket.SocketChannel;11 import io.netty.channel.socket.nio.NioServerSocketChannel;12 import io.netty.handler.timeout.ReadTimeoutHandler;13 14 public class Server {15 16 ????17 ????public static void main(String[] args) throws Exception {18 ????????//ONE:19 ????????//1 用于接受客户端连接的线程工作组20 ????????EventLoopGroup boss = new NioEventLoopGroup();21 ????????//2 用于对接受客户端连接读写操作的线程工作组22 ????????EventLoopGroup work = new NioEventLoopGroup();23 ????????24 ????????//TWO:25 ????????//3 辅助类。用于帮助我们创建NETTY服务26 ????????ServerBootstrap b = new ServerBootstrap();27 ????????b.group(boss, work) ???//绑定两个工作线程组28 ?????????.channel(NioServerSocketChannel.class) ???//设置NIO的模式29 ?????????.option(ChannelOption.SO_BACKLOG, 1024) ???//设置TCP缓冲区30 ?????????//.option(ChannelOption.SO_SNDBUF, 32*1024) ???// 设置发送数据的缓存大小31 ?????????.option(ChannelOption.SO_RCVBUF, 32*1024) ???// 设置接受数据的缓存大小32 ?????????.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) ???// 设置保持连接33 ?????????.childOption(ChannelOption.SO_SNDBUF, 32*1024)34 ?????????// 初始化绑定服务通道35 ?????????.childHandler(new ChannelInitializer<SocketChannel>() {36 ????????????@Override37 ????????????protected void initChannel(SocketChannel sc) throws Exception {38 ????????????????// 为通道进行初始化: 数据传输过来的时候会进行拦截和执行39 ????????????????sc.pipeline().addLast(new ReadTimeoutHandler(5));40 ????????????????sc.pipeline().addLast(new ServerHandler());41 ????????????}42 ?????????});43 ????????44 ????????ChannelFuture cf = b.bind(8765).sync();45 ????????46 ????????47 ????????48 ????????//释放连接49 ????????cf.channel().closeFuture().sync();50 ????????work.shutdownGracefully();51 ????????boss.shutdownGracefully();52 ????}53 }
四,服务端助手类
1 package bhz.netty.runtime; 2 ?3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelFutureListener; 6 import io.netty.channel.ChannelHandlerContext; 7 import io.netty.channel.ChannelInboundHandlerAdapter; 8 import io.netty.util.ReferenceCountUtil; 9 10 public class ServerHandler extends ChannelInboundHandlerAdapter {11 12 ????/**13 ?????* 当我们通道进行激活的时候 触发的监听方法14 ?????*/15 ????@Override16 ????public void channelActive(ChannelHandlerContext ctx) throws Exception {17 ????18 ????????System.err.println("--------通道激活------------");19 ????}20 ????21 ????/**22 ?????* 当我们的通道里有数据进行读取的时候 触发的监听方法23 ?????*/24 ????@Override25 ????public void channelRead(ChannelHandlerContext ctx /*NETTY服务上下文*/, Object msg /*实际的传输数据*/) throws Exception {26 // ???????try{27 ????????????//do something with msg28 ????????????29 ????????????//NIO通信(传输的数据是什么? ----> buffer对象)30 ????????????ByteBuf buf = (ByteBuf)msg;31 ????????????byte[] request = new byte[buf.readableBytes()];32 ????????????buf.readBytes(request);33 ????????????String body = new String(request, "utf-8");34 ????????????System.out.println("服务器: " + body); 35 ????????????36 ????????????//ByteBuf37 ????????????String response = "我是返回的数据!!";38 ????????????ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));39 ????????????//添加addListener 可以触发关闭通道监听事件40 ????????????//.addListener(ChannelFutureListener.CLOSE); ???????????41 ????????????42 // ???????} finally {43 // ???????????ReferenceCountUtil.release(msg);44 // ???????}45 ????????46 47 48 ????????49 ????}50 ????51 ????@Override52 ????public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {53 ???????System.err.println("--------数据读取完毕----------");54 ????}55 ????56 ????@Override57 ????public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)58 ????????????throws Exception {59 ????????System.err.println("--------数据读异常----------: ");60 ????????cause.printStackTrace();61 ????????ctx.close();62 ????}63 ????64 ????65 ????66 ????67 ????68 ????69 ????70 ????71 ????72 ????73 ????74 ????75 ????76 }
五,结果
客户端结果
1 客户端:我是返回的数据!!2 客户端:我是返回的数据!!3 客户端:我是返回的数据!!4 客户端:我是返回的数据!!5 客户端:我是返回的数据!!
客户端已近结束了。
服务端结果
1 --------通道激活------------ 2 服务器: hello netty!-1 3 --------数据读取完毕---------- 4 服务器: hello netty!-2 5 --------数据读取完毕---------- 6 服务器: hello netty!-3 7 --------数据读取完毕---------- 8 服务器: hello netty!-4 9 --------数据读取完毕----------10 服务器: hello netty!-511 --------数据读取完毕----------12 --------数据读异常----------: 13 io.netty.handler.timeout.ReadTimeoutException
<Netty>(十七)(中级篇)实践----数据通信
原文地址:https://www.cnblogs.com/qingruihappy/p/8570111.html