???????ByteBuf cumulation; ???private boolean first; ???????@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????if (msg instanceof ByteBuf) { ???????????CodecOutputList out = CodecOutputList.newInstance(); ???????????try { ???????????????ByteBuf data = (ByteBuf) msg; ???????????????// channel 第一次读入数据时, cumulation == null ???????????????// 将 cumulation 中的数据读完之后,也会将 cumulation 的内存释放, 并置为 null ???????????????first = cumulation == null; ???????????????if (first) { ???????????????????cumulation = data; ???????????????} else { ???????????????????// 后续读入数据后 ???????????????????cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); ???????????????} ???????????????// 解码传入的 ByteBuf ???????????????callDecode(ctx, cumulation, out); ???????????} catch (DecoderException e) { ???????????????throw e; ???????????} catch (Throwable t) { ???????????????throw new DecoderException(t); ???????????} finally { ????????????????// 如果 ByteBuf 不为空,并且数据已经读完了 ???????????????// 则释放内存 ???????????????if (cumulation != null && !cumulation.isReadable()) { ???????????????????numReads = 0; ???????????????????cumulation.release(); ???????????????????cumulation = null; ???????????????} else if (++ numReads >= discardAfterReads) { ?????????????????????numReads = 0; ???????????????????discardSomeReadBytes(); ???????????????} ???????????????int size = out.size(); ???????????????decodeWasNull = !out.insertSinceRecycled(); ???????????????fireChannelRead(ctx, out, size); ???????????????out.recycle(); ???????????} ???????} else { ???????????ctx.fireChannelRead(msg); ???????} ???} ???????????// 解码 ByteBuf ??????protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { ???????try { ???????????// 如果 ByteBuf 有可读数据就继续解码 ???????????while (in.isReadable()) { ???????????????????????????????// 如果 out 中有内容,则将内容分发出去,然后清空 out, 并将 outSize 置为 0 ???????????????int outSize = out.size(); ???????????????if (outSize > 0) { ???????????????????// 将 out 中的所有消息分发出去 ???????????????????fireChannelRead(ctx, out, outSize); ???????????????????out.clear(); ???????????????????if (ctx.isRemoved()) { ???????????????????????break; ???????????????????} ???????????????????outSize = 0; ???????????????} ???????????????// 解码之前的可读长度 ???????????????int oldInputLength = in.readableBytes(); ???????????????// 调用自定义的解码器 ???????????????decode(ctx, in, out); ???????????????if (ctx.isRemoved()) { ???????????????????break; ???????????????} ???????????????// 由上面可知,outSize 为0 , ???????????????// 没有读取数据,没有得到消息对象,则跳出循环 ???????????????if (outSize == out.size()) { ???????????????????if (oldInputLength == in.readableBytes()) { ???????????????????????break; ???????????????????} else { ???????????????????????continue; ???????????????????} ???????????????} ????????????????// 没有读取数据,却得到自定义的消息对象,抛出异常 ???????????????if (oldInputLength == in.readableBytes()) { ???????????????????throw new DecoderException( ???????????????????????????StringUtil.simpleClassName(getClass()) + ???????????????????????????".decode() did not read anything but decoded a message."); ???????????????} ???????????????if (isSingleDecode()) { ???????????????????break; ???????????????} ???????????} ???????} catch (DecoderException e) { ???????????throw e; ???????} catch (Throwable cause) { ???????????throw new DecoderException(cause); ???????} ???}
Netty:解码器
原文地址:https://www.cnblogs.com/virgosnail/p/10492157.html