package com.cn.codc;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.frame.FrameDecoder;import com.cn.constant.ConstantValue;import com.cn.model.Request;/** * 请求解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 ?????????| 模块号 ???????| 命令号 ?????| ?长度 ???????| ??数据 ??????| * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */public class RequestDecoder extends FrameDecoder{// FrameDecoder 这个decoder可以协助我们解决粘包分包问题 ???????/** ????* 数据包基本长度 ????*/ ???public static int BASE_LENTH = 4 + 2 + 2 + 4; ???//ChannelBuffer里面有一个读指针和写指针。读指针和写指针初始值是0,写多少数据写指针就移动多少 ???//调用readShort方法,readInt方法就会移动读指针, 0 =< readerIndex =< writerIndex ???@Override ???protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception { ???????????????//可读长度readableBytes必须大于基本长度才处理 ???????if(buffer.readableBytes() >= BASE_LENTH){ ???????????//防止socket字节流攻击 ???????????if(buffer.readableBytes() > 2048){ ???????????????buffer.skipBytes(buffer.readableBytes()); ???????????} ???????????????????????//记录包头开始的index ???????????int beginReader; ???????????????????????while(true){//循环读取,直到包头读取完毕 ???????????????beginReader = buffer.readerIndex();//获取读指针 ???????????????buffer.markReaderIndex(); ???????????????if(buffer.readInt() == ConstantValue.FLAG){ ???????????????????break; ???????????????} ???????????????????????????????//未读到包头,略过一个字节 ???????????????buffer.resetReaderIndex(); ???????????????buffer.readByte(); ???????????????????????????????//长度又变得不满足 ???????????????if(buffer.readableBytes() < BASE_LENTH){ ???????????????????return null; ???????????????} ???????????} ???????????????????????//包头读取完毕,读取模块号 ???????????short module = buffer.readShort(); ???????????//读取命令号 ???????????short cmd = buffer.readShort(); ???????????//读取长度 ???????????int length = buffer.readInt(); ???????????????????????//readableBytes现在可读的长度小于数据的长度。判断请求数据包数据部分是否到齐 ???????????if(buffer.readableBytes() < length){ ???????????????//还原读指针,已经读取了12个字节,但是没用,所以要还原buffer的读指针, ???????????????buffer.readerIndex(beginReader); ???????????????return null;//等待后面的数据包来 ???????????} ???????????????????????//比length要长,就读取data数据 ???????????byte[] data = new byte[length]; ???????????buffer.readBytes(data);//数据读取完毕 ???????????????????????//封装request对象继续向下传递 ???????????Request request = new Request(); ???????????request.setModule(module); ???????????request.setCmd(cmd); ???????????request.setData(data); ???????????????????????//继续往下传递 ,调用sendUpStreamEvent方法向下传递 ???????????return request; ???????????????????} ???????//长度短了,数据包不完整,需要等待后面的包来 ???????return null; ???????//FrameDecoder: return null就是等待后面的包,return一个解码的对象就是向下传递。 ???}}
package com.cn.codc;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;import com.cn.constant.ConstantValue;import com.cn.model.Request;/** * 请求编码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+ * | 包头 ?????????| 模块号 ???????| 命令号 ?????| ?长度 ???????| ??数据 ??????| * +——----——+——-----——+——----——+——----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */public class RequestEncoder extends OneToOneEncoder{ ???//把一个request对象转换成了一个ChannelBuffer二进制数据 ???@Override ???protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception { ???????Request request = (Request)(rs); ???????ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); ???????//包头,确定数据包的开始 ???????buffer.writeInt(ConstantValue.FLAG); ???????//module ???????buffer.writeShort(request.getModule()); ???????//cmd ???????buffer.writeShort(request.getCmd()); ???????//长度 ???????buffer.writeInt(request.getDataLength()); ???????//data ???????if(request.getData() != null){ ???????????buffer.writeBytes(request.getData()); ???????} ???????return buffer;//返回一个ChannelBuffer继续向下传递。 ???}}
package com.cn.codc;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.frame.FrameDecoder;import com.cn.constant.ConstantValue;import com.cn.model.Response;/** * response解码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * | 包头 ?????????| 模块号 ???????| 命令号 ??????| ?状态码 ???| ?长度 ?????????| ??数据 ??????| * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */public class ResponseDecoder extends FrameDecoder{ ???????/** ????* 数据包基本长度 ????*/ ???public static int BASE_LENTH = 4 + 2 + 2 + 4; ???@Override ???protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception { ???????????????//可读长度必须大于基本长度 ???????if(buffer.readableBytes() >= BASE_LENTH){ ???????????????????????//记录包头开始的index ???????????int beginReader = buffer.readerIndex(); ???????????????????????while(true){ ???????????????if(buffer.readInt() == ConstantValue.FLAG){ ???????????????????break; ???????????????} ???????????} ???????????????????????//模块号 ???????????short module = buffer.readShort(); ???????????//命令号 ???????????short cmd = buffer.readShort(); ???????????//状态码 ???????????int stateCode = buffer.readInt(); ???????????//长度 ???????????int length = buffer.readInt(); ???????????????????????if(buffer.readableBytes() < length){ ???????????????//还原读指针 ???????????????buffer.readerIndex(beginReader); ???????????????return null; ???????????} ???????????????????????byte[] data = new byte[length]; ???????????buffer.readBytes(data); ???????????????????????//封装Response对象 ???????????Response response = new Response(); ???????????response.setModule(module); ???????????response.setCmd(cmd); ???????????response.setStateCode(stateCode); ???????????response.setData(data); ???????????????????????//继续往下传递 ????????????return response; ???????????????????} ???????//数据包不完整,需要等待后面的包来 ???????return null; ???}}
package com.cn.codc;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;import com.cn.constant.ConstantValue;import com.cn.model.Response;/** * 请求编码器 * <pre> * 数据包格式 * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * | 包头 ?????????| 模块号 ???????| 命令号 ??????| ?状态码 ???| ?长度 ?????????| ??数据 ??????| * +——----——+——-----——+——----——+——----——+——-----——+——-----——+ * </pre> * 包头4字节 * 模块号2字节short * 命令号2字节short * 长度4字节(描述数据部分字节长度) */public class ResponseEncoder extends OneToOneEncoder{ ???@Override ???protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception { ???????Response response = (Response)(rs); ???????????????ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); ???????//包头 ???????buffer.writeInt(ConstantValue.FLAG); ???????//module ???????buffer.writeShort(response.getModule()); ???????//cmd ???????buffer.writeShort(response.getCmd()); ???????//状态码 ???????buffer.writeInt(response.getStateCode()); ???????//长度 ???????buffer.writeInt(response.getDataLength()); ???????//data ???????if(response.getData() != null){ ???????????buffer.writeBytes(response.getData()); ???????} ???????????return buffer; ???}}
package com.cn.constant;public interface ConstantValue { ???????/** ????* 包头 ????*/ ???public static final int FLAG = -32523523;}
package com.cn.model;/** * 客户端请求服务端的对象 */public class Request { ???????/** ????* 请求模块 ????*/ ???private short module; ???????/** ????* 命令号 ????*/ ???private short cmd; ???????/** ????* 数据部分 ????*/ ???private byte[] data; ???public short getModule() { ???????return module; ???} ???public void setModule(short module) { ???????this.module = module; ???} ???public short getCmd() { ???????return cmd; ???} ???public void setCmd(short cmd) { ???????this.cmd = cmd; ???} ???public byte[] getData() { ???????return data; ???} ???public void setData(byte[] data) { ???????this.data = data; ???} ???????????public int getDataLength(){ ???????if(data == null){ ???????????return 0; ???????} ???????return data.length; ???}}
package com.cn.model;/** * 服务端返回给客户端的对象 */public class Response { ???/** ????* 请求模块 ????*/ ???private short module; ???????/** ????* 命令号 ????*/ ???private short cmd; ???????/** ????* 状态码 ????*/ ???private int stateCode; ???????/** ????* 数据部分 ????*/ ???private byte[] data; ???public short getModule() { ???????return module; ???} ???public void setModule(short module) { ???????this.module = module; ???} ???public short getCmd() { ???????return cmd; ???} ???public void setCmd(short cmd) { ???????this.cmd = cmd; ???} ???public int getStateCode() { ???????return stateCode; ???} ???public void setStateCode(int stateCode) { ???????this.stateCode = stateCode; ???} ???public byte[] getData() { ???????return data; ???} ???public void setData(byte[] data) { ???????this.data = data; ???} ???????public int getDataLength(){ ???????if(data == null){ ???????????return 0; ???????} ???????return data.length; ???}}
package com.cn.model;public interface StateCode { ???????/** ????* 成功 ????*/ ???public static int SUCCESS ?= 0; ???????/** ????* 失败 ????*/ ???public static int FAIL ?= ?1;}
package com.cn.module.fuben.request;import com.cn.serial.Serializer;//FightRequest是模块名public class FightRequest extends Serializer{ ???????/** ????* 副本id ????*/ ???private int fubenId; ???????/** ????* 次数 ????*/ ???private int count; ???public int getFubenId() { ???????return fubenId; ???} ???public void setFubenId(int fubenId) { ???????this.fubenId = fubenId; ???} ???public int getCount() { ???????return count; ???} ???public void setCount(int count) { ???????this.count = count; ???} ???@Override ???protected void read() { ???????this.fubenId = readInt(); ???????this.count = readInt(); ???} ???@Override ???protected void write() { ???????writeInt(fubenId); ???????writeInt(count); ???} ???????}
package com.cn.module.fuben.response;import com.cn.serial.Serializer;public class FightResponse extends Serializer{ ???/** ????* 获取金币 ????*/ ???private int gold; ???public int getGold() { ???????return gold; ???} ???public void setGold(int gold) { ???????this.gold = gold; ???} ???@Override ???protected void read() { ???????this.gold = readInt(); ???} ???@Override ???protected void write() { ???????writeInt(gold); ???}}
package com.cn.serial;import java.nio.ByteOrder;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;/** * buff工厂 */public class BufferFactory { ???????public static ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN; ???/** ????* 获取一个buffer ????*/ ???public static ChannelBuffer getBuffer() { ???????ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer(); ???????return dynamicBuffer; ???} ???/** ????* 将数据写入buffer ????*/ ???public static ChannelBuffer getBuffer(byte[] bytes) { ???????ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(bytes); ???????return copiedBuffer; ???}}
package com.cn.serial;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.Collection;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry;import org.jboss.netty.buffer.ChannelBuffer;/** * 自定义序列化接口 */public abstract class Serializer { ???????????public static final Charset CHARSET = Charset.forName("UTF-8"); ???????protected ChannelBuffer writeBuffer; ???????protected ChannelBuffer readBuffer; ???????/** ????* 反序列化具体实现 ????*/ ???protected abstract void read(); ???????/** ????* 序列化具体实现 ????*/ ???protected abstract void write(); ???????/** ????* 从byte数组获取数据 ????* @param bytes ???读取的数组 ????*/ ???public Serializer readFromBytes(byte[] bytes) { ???????readBuffer = BufferFactory.getBuffer(bytes); ???????read(); ???????readBuffer.clear(); ???????return this; ???} ???????/** ????* 从buff获取数据 ????* @param readBuffer ????*/ ???public void readFromBuffer(ChannelBuffer readBuffer) { ???????this.readBuffer = readBuffer; ???????read(); ???} ???????/** ????* 写入本地buff ????* @return ????*/ ???public ChannelBuffer writeToLocalBuff(){ ???????writeBuffer = BufferFactory.getBuffer(); ???????write(); ???????return writeBuffer; ???} ???????/** ????* 写入目标buff ????* @param buffer ????* @return ????*/ ???public ChannelBuffer writeToTargetBuff(ChannelBuffer buffer){ ???????writeBuffer = buffer; ???????write(); ???????return writeBuffer; ???} ???????/** ????* 返回buffer数组 ????* ?????* @return ????*/ ???public byte[] getBytes() { ???????writeToLocalBuff(); ???????byte[] bytes = null; ???????if (writeBuffer.writerIndex() == 0) { ???????????bytes = new byte[0]; ???????} else { ???????????bytes = new byte[writeBuffer.writerIndex()]; ???????????writeBuffer.readBytes(bytes); ???????} ???????writeBuffer.clear(); ???????return bytes; ???} ???????public byte readByte() { ???????return readBuffer.readByte(); ???} ???public short readShort() { ???????return readBuffer.readShort(); ???} ???public int readInt() { ???????return readBuffer.readInt(); ???} ???public long readLong() { ???????return readBuffer.readLong(); ???} ???public float readFloat() { ???????return readBuffer.readFloat(); ???} ???public double readDouble() { ???????return readBuffer.readDouble(); ???} ???????public String readString() { ???????int size = readBuffer.readShort(); ???????if (size <= 0) { ???????????return ""; ???????} ???????byte[] bytes = new byte[size]; ???????readBuffer.readBytes(bytes); ???????return new String(bytes, CHARSET); ???} ???????public <T> List<T> readList(Class<T> clz) { ???????List<T> list = new ArrayList<>(); ???????int size = readBuffer.readShort(); ???????for (int i = 0; i < size; i++) { ???????????list.add(read(clz)); ???????} ???????return list; ???} ???????public <K,V> Map<K,V> readMap(Class<K> keyClz, Class<V> valueClz) { ???????Map<K,V> map = new HashMap<>(); ???????int size = readBuffer.readShort(); ???????for (int i = 0; i < size; i++) { ???????????K key = read(keyClz); ???????????V value = read(valueClz); ???????????map.put(key, value); ???????????} ???????return map; ???} ???????@SuppressWarnings("unchecked") ???public <I> I read(Class<I> clz) { ???????Object t = null; ???????if ( clz == int.class || clz == Integer.class) { ???????????t = this.readInt(); ???????} else if (clz == byte.class || clz == Byte.class){ ???????????t = this.readByte(); ???????} else if (clz == short.class || clz == Short.class){ ???????????t = this.readShort(); ???????} else if (clz == long.class || clz == Long.class){ ???????????t = this.readLong(); ???????} else if (clz == float.class || clz == Float.class){ ???????????t = readFloat(); ???????} else if (clz == double.class || clz == Double.class){ ???????????t = readDouble(); ???????} else if (clz == String.class ){ ???????????t = readString(); ???????} else if (Serializer.class.isAssignableFrom(clz)){ ???????????try { ???????????????byte hasObject = this.readBuffer.readByte(); ???????????????if(hasObject == 1){ ???????????????????Serializer temp = (Serializer)clz.newInstance(); ???????????????????temp.readFromBuffer(this.readBuffer); ???????????????????t = temp; ???????????????}else{ ???????????????????t = null; ???????????????} ???????????} catch (Exception e) { ???????????????e.printStackTrace(); ???????????} ????????????????????} else { ???????????throw new RuntimeException(String.format("不支持类型:[%s]", clz)); ???????} ???????return (I) t; ???} ???public Serializer writeByte(Byte value) { ???????writeBuffer.writeByte(value); ???????return this; ???} ???public Serializer writeShort(Short value) { ???????writeBuffer.writeShort(value); ???????return this; ???} ???public Serializer writeInt(Integer value) { ???????writeBuffer.writeInt(value); ???????return this; ???} ???public Serializer writeLong(Long value) { ???????writeBuffer.writeLong(value); ???????return this; ???} ???public Serializer writeFloat(Float value) { ???????writeBuffer.writeFloat(value); ???????return this; ???} ???public Serializer writeDouble(Double value) { ???????writeBuffer.writeDouble(value); ???????return this; ???} ???public <T> Serializer writeList(List<T> list) { ???????if (isEmpty(list)) { ???????????writeBuffer.writeShort((short) 0); ???????????return this; ???????} ???????writeBuffer.writeShort((short) list.size()); ???????for (T item : list) { ???????????writeObject(item); ???????} ???????return this; ???} ???public <K,V> Serializer writeMap(Map<K, V> map) { ???????if (isEmpty(map)) { ???????????writeBuffer.writeShort((short) 0); ???????????return this; ???????} ???????writeBuffer.writeShort((short) map.size()); ???????for (Entry<K, V> entry : map.entrySet()) { ???????????writeObject(entry.getKey()); ???????????writeObject(entry.getValue()); ???????} ???????return this; ???} ???public Serializer writeString(String value) { ???????if (value == null || value.isEmpty()) { ???????????writeShort((short) 0); ???????????return this; ???????} ???????byte data[] = value.getBytes(CHARSET); ???????short len = (short) data.length; ???????writeBuffer.writeShort(len); ???????writeBuffer.writeBytes(data); ???????return this; ???} ???public Serializer writeObject(Object object) { ???????????????if(object == null){ ???????????writeByte((byte)0); ???????}else{ ???????????if (object instanceof Integer) { ???????????????writeInt((int) object); ???????????????return this; ???????????} ???????????if (object instanceof Long) { ???????????????writeLong((long) object); ???????????????return this; ???????????} ???????????if (object instanceof Short) { ???????????????writeShort((short) object); ???????????????return this; ???????????} ???????????if (object instanceof Byte) { ???????????????writeByte((byte) object); ???????????????return this; ???????????} ???????????if (object instanceof String) { ???????????????String value = (String) object; ???????????????writeString(value); ???????????????return this; ???????????} ???????????if (object instanceof Serializer) { ???????????????writeByte((byte)1); ???????????????Serializer value = (Serializer) object; ???????????????value.writeToTargetBuff(writeBuffer); ???????????????return this; ???????????} ???????????????????????throw new RuntimeException("不可序列化的类型:" + object.getClass()); ???????} ???????????????return this; ???} ???private <T> boolean isEmpty(Collection<T> c) { ???????return c == null || c.size() == 0; ???} ???public <K,V> boolean isEmpty(Map<K,V> c) { ???????return c == null || c.size() == 0; ???}}
netty8---自定义编码解码器
原文地址:https://www.cnblogs.com/yaowen/p/9063053.html