netty是基于javaNio模型的网络编程框架。很多框架底层也是用netty实现的 比如dubbo
与NIO的区别
1.简化了API的使用。基于事件驱动。只需要在对应的事件写相应的业务就行了。
2.上层封装多种协议的实现 webSoket,http。同时修复了NIO的bug(内存泄漏 nio buffer构造函数私有无法扩展问题)
Server代码
package com.liqiang.nettyTest2;import java.net.InetSocketAddress;import java.util.List;import java.util.Vector;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class Server { private int port;//监听端口 private Vector<ChannelHandlerContext> clients;//保存在线客户端信息 public Server(int port) { ????clients=new Vector<ChannelHandlerContext>(); ????this.port=port; } //广播 public void sendAll(String msg) { ????clients.forEach(c->{ ????????c.writeAndFlush(msg); ????}); } public void addClient(ChannelHandlerContext client) { ????clients.add(client); } public void start() { ????/** ?????* NioEventLoopGroup 内部维护一个线程池 ?????* 如果构造函数没有指定线程池数量 则默认为系统core*2 ?????*/ ????EventLoopGroup acceptor=new NioEventLoopGroup();//acceptor负责监客户端连接请求 ????EventLoopGroup worker=new NioEventLoopGroup();//worker负责io读写(监听注册channel的 read/writer事件) ???????????ServerBootstrap bootstrap=new ServerBootstrap(); ????bootstrap.group(acceptor,worker) ????.channel(NioServerSocketChannel.class) ????.localAddress(new InetSocketAddress(port)) ????.childHandler(new ServerChannelInitializer(this)).option(ChannelOption.SO_BACKLOG, 128) ?????.childOption(ChannelOption.SO_KEEPALIVE, true); ????try { ???????ChannelFuture channelFuture= bootstrap.bind(port).sync(); ???????????????System.out.println("服务器已启动"); ???????//将阻塞 直到服务器端关闭或者手动调用 ??????// channelFuture.channel().closeFuture().sync(); ???????//释放资源 ???????//acceptor.shutdownGracefully(); ???????//worker.shutdownGracefully(); ???} catch (InterruptedException e) { ???????// TODO Auto-generated catch block ???????e.printStackTrace(); ???} } }
ServerChannelInitializer实现
package com.liqiang.nettyTest2;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { ???private Server server; ???public ServerChannelInitializer(Server server) { ???????this.server=server; ???} ???@Override ???protected void initChannel(SocketChannel channel) throws Exception { ???????// TODO Auto-generated method stub ???????channel.pipeline() ???????.addLast("decoder",new StringDecoder())//接收到数据 自动将将buffer转换为String 避免自己再转 ???????.addLast("encoder",new StringEncoder())//发送数据 可以直接发送String 框架内部转换为buffer传输 ???????.addLast(new ServerHandle(server)); ???}}
decoder和ecoder都是和ServerHandle间接继承了ChannelInboundHandlerAdapter
表示addLast可以注册多个管道 相当于责任链模式的变种 pipeline注册的Handle都会根据顺序被执行
ServerHandle实现
package com.liqiang.nettyTest2;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class ServerHandle extends ChannelInboundHandlerAdapter { ???private Server server; ?????public ServerHandle(Server server) { ???????// TODO Auto-generated constructor stub ?????????this.server=server; ???} ???// 建立连接时回调 ???@Override ???public void channelActive(ChannelHandlerContext ctx) throws Exception { ???????// TODO Auto-generated method stub ???????System.out.println("有客户端建立连接了"); ???????server.addClient(ctx); ???????//ctx.fireChannelActive();//pipeline可以注册多个handle ?这里可以理解为是否通知下一个Handle继续处理 ???} ???//接收到客户端发送消息时回调 ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????// TODO Auto-generated method stub ??????System.out.println("server接收到客户端发送信息:"+msg.toString()); ??????//ctx.fireChannelRead(msg);pipeline可以注册多个handle ?这里可以理解为是否通知下一个Handle继续处理 ???} ???//通信过程中发生异常回调 ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????// TODO Auto-generated method stub ???????//super.exceptionCaught(ctx, cause); ???????ctx.close();//发生异常关闭通信通道 ???????cause.printStackTrace();//打印错误信息 ???????//ctx.fireExceptionCaught(cause);pipeline可以注册多个handle ?这里可以理解为是否通知下一个Handle继续处理 ???}}
Client端实现
package com.liqiang.nettyTest2;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.util.concurrent.EventExecutorGroup;public class Client { ???private String ip;// ip ???private int port;// 端口 ???private boolean isConnection = false; ???private ChannelHandlerContext serverChannel;//服务器端的通信通道 ???public Client(String ip, int port) { ???????this.ip = ip; ???????this.port = port; ???} ???// 与服务器建立连接 ???public void connection() { ???????EventLoopGroup group = new NioEventLoopGroup();// 服务器监听服务器发送信息 ???????Bootstrap bootstrap = new Bootstrap(); ???????bootstrap.group(group).channel(NioSocketChannel.class) ?
??????????.option(ChannelOption.TCP_NODELAY, true) ???????????????.handler(new ClientChannelInitializer(this)); ???????try { ???????????ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); ???????????// System.out.println(channelFuture.isSuccess()); ???????????// 监听是否连接成功 ???????????while (!isConnection) { ???????????????Thread.sleep(1000); ???????????} ???????????// channelFuture.channel().closeFuture().sync(); 断开连接才会往下执行 ???????} catch (InterruptedException e) { ???????????// TODO Auto-generated catch block ???????????System.out.println("连接服务器失败"); ???????} ???} ???public boolean isConnection() { ???????return isConnection; ???} ???public void setConnection(boolean isConnection) { ???????this.isConnection = isConnection; ???} ???public void sendMsg(String msg) { ???????serverChannel.writeAndFlush(msg); ???} ???public ChannelHandlerContext getServerChannel() { ???????return serverChannel; ???} ???public void setServerChannel(ChannelHandlerContext serverChannel) { ???????this.serverChannel = serverChannel; ???}}
ClientChannelInitializer
package com.liqiang.nettyTest2;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> { ???private Client client; ???public ?ClientChannelInitializer(Client client) { ???????// TODO Auto-generated constructor stub ???????this.client=client; ???} ???@Override ???protected void initChannel(SocketChannel socketChannel) throws Exception { ???????// TODO Auto-generated method stub ???????socketChannel.pipeline() ???????.addLast("decoder",new StringDecoder())//注册String编码器和解码器 会在发送数据和接收数据通过编码器和解码器转换为String ???????.addLast("encoder",new StringEncoder()) ???????.addLast(new ClientHandle(client));//注册处理器 ???????????}}
ClientHandle
package com.liqiang.nettyTest2;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.http.cors.CorsHandler;public class ClientHandle extends ChannelInboundHandlerAdapter { ???????Client client; ???public ?ClientHandle(Client client) { ???????// TODO Auto-generated constructor stub ??????this.client=client; ???} ???????//建立连接时回调 ???@Override ???public void channelActive(ChannelHandlerContext ctx) throws Exception { ???????// TODO Auto-generated method stub ???????//System.out.println("与服务器建立连接成功"); ???????client.setServerChannel(ctx); ???????client.setConnection(true); ???????//ctx.fireChannelActive();//如果注册多个handle 下一个handel的事件需要触发需要调用这个方法 ???????????} ???//读取服务器发送信息时回调 ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????// TODO Auto-generated method stub ???????System.out.println(msg.toString()); ???} ???//发生异常时回调 ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????// TODO Auto-generated method stub ???????cause.printStackTrace();//打印异常 ???????ctx.close();//关闭连接 ???}}
测试
package com.liqiang.nettyTest2;public class nettyMain { ???public static void main(String[] args) { ???????new Thread(new Runnable() { ???????????@Override ???????????public void run() { ???????????????// TODO Auto-generated method stub ???????????????Server server = new Server(8081); ???????????????server.start(); ???????????????try { ???????????????????Thread.sleep(5000);//5秒后测试服务器端广播功能 ???????????????????server.sendAll("服务器端广播信息"); ???????????????} catch (InterruptedException e) { ???????????????????// TODO Auto-generated catch block ???????????????????e.printStackTrace(); ???????????????} ???????????} ???????}).start(); ???????new Thread(new Runnable() { ???????????????????????@Override ???????????public void run() { ???????????????// TODO Auto-generated method stub ???????????????Client client1=new Client("127.0.0.1", 8081); ???????????????client1.connection(); ???????????????client1.sendMsg("我是客户端1"); ???????????????????????????????Client client2=new Client("127.0.0.1", 8081); ???????????????client2.connection(); ???????????????client2.sendMsg("我是客户端2"); ???????????} ???????}).start(); ???}}
输出
通过netty可以轻松实现点对点 ?一对多 ?广播等功能 ??但是对于netty的学习不应止与此
netty helloWord (一)
原文地址:https://www.cnblogs.com/LQBlog/p/9141545.html