客户端:
package com.server;import java.net.Socket;import java.nio.ByteBuffer;public class Client { ???public static void main(String[] args) throws Exception { ???????Socket socket = new Socket("127.0.0.1", 10101); ???????????????String message = "hello"; ???????????????byte[] bytes = message.getBytes(); ???????????????ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length); ???????buffer.putInt(bytes.length);//netty是write,ByteBuffer是nio的,所以用put。 ???????buffer.put(bytes); ???????????????byte[] array = buffer.array(); ???????????????????for(int i=0; i<5; i++){ ???????????socket.getOutputStream().write(array); ???????} ???????????????????socket.close(); ???}}
服务端:
package com.server;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;import org.jboss.netty.handler.codec.string.StringDecoder;import org.jboss.netty.handler.codec.string.StringEncoder;public class Server { ???public static void main(String[] args) { ???????//服务类 ???????ServerBootstrap bootstrap = new ServerBootstrap(); ???????//boss线程监听端口,worker线程负责数据读写 ???????ExecutorService boss = Executors.newCachedThreadPool(); ???????ExecutorService worker = Executors.newCachedThreadPool(); ???????//设置niosocket工厂 ???????bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker)); ???????//设置管道的工厂 ???????bootstrap.setPipelineFactory(new ChannelPipelineFactory() { ???????????@Override ???????????public ChannelPipeline getPipeline() throws Exception { ???????????????ChannelPipeline pipeline = Channels.pipeline(); ???????????????pipeline.addLast("decoder", new MyDecoder()); ???????????????pipeline.addLast("handler1", new HelloHandler()); ???????????????return pipeline; ???????????} ???????}); ???????bootstrap.bind(new InetSocketAddress(10101)); ???????System.out.println("start!!!"); ???}}
package com.server;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.frame.FrameDecoder;public class MyDecoder extends FrameDecoder { ???@Override ?//FrameDecoder的decode方法 ???protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { ???????//buffer是netty的ChannelBuffer ???????if(buffer.readableBytes() > 4){ //必须大于基本的最短长度4个字节 ???????????if(buffer.readableBytes() > 2048){ ???????????????buffer.skipBytes(buffer.readableBytes()); ???????????} ???????????//标记 ???????????buffer.markReaderIndex(); ???????????//长度 ???????????int length = buffer.readInt(); ???????????//buffer里面剩余的数据小于长度 ???????????if(buffer.readableBytes() < length){ ???????????????//前面做了标记,这里可以还原 ???????????????buffer.resetReaderIndex(); ???????????????//缓存当前剩余的buffer数据,等待剩下数据包到来 ???????????????return null; ???????????} ???????????//大于长度,开始读数据 ???????????byte[] bytes = new byte[length]; ???????????buffer.readBytes(bytes); ???????????//往下传递对象给HelloHandler,这次的buffer处理完了,后面在来buffer的时候FrameDecoder会帮我们循环读取, ???????????return new String(bytes); ???????} ???????//缓存当前剩余的buffer数据,等待剩下数据包到来(FrameDecoder帮我们实现的), ???????return null; ???}}
package com.server;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;public class HelloHandler extends SimpleChannelHandler { ???????private int count = 1;//单线程的没有并发问题 ???@Override ???public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ???????System.out.println(e.getMessage() + " ?" +count); ???????count++; ???}}
netty10---分包粘包
原文地址:https://www.cnblogs.com/yaowen/p/9063227.html