分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 运营维护

【netty这点事儿】ByteBuf 的使用模式

发布时间:2023-09-06 01:44责任编辑:胡小海关键词:暂无标签

堆缓冲区

最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。 这种模式被称为支撑数组
(backing array), 它能在没有使用池化的情况下提供快速的分配和释放。

直接缓冲区

直接缓冲区的内容将驻留在常规的会被垃圾回收的堆之外。直接缓冲区对于网络数据传输是理想的选择。因为如果你的数据包含在一个在堆上分配的缓冲区中,那么事实上,在通过套接字发送它之前,JVM将会在内部把你的缓冲区复制到一个直接缓冲区中。
直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。所以该类型内存只在收发数据包内存使用效率较高,在netty应用层中不合适。

复合缓冲区

让我们考虑一下一个由两部分——头部和主体——组成的将通过 HTTP 协议传输的消息。这两部分由应用程序的不同模块产生, 将会在消息被发送的时候组装。该应用程序可以选择为多个消息重用相同的消息主体。当这种情况发生时,对于每个消息都将会创建一个新的头部。
因为我们不想为每个消息都重新分配这两个缓冲区,所以使用 CompositeByteBuf 是一个
完美的选择。
需要注意的是, Netty使用了CompositeByteBuf来优化套接字的I/O操作,尽可能地消除了
由JDK的缓冲区实现所导致的性能以及内存使用率的惩罚。这种优化发生在Netty的核心代码中,因此不会被暴露出来,但是你应该知道它所带来的影响。

ByteBuf 的分配方式

池化的分配 PooledByteBufAllocator是ByteBufAllocator的默认方式

可以通过 Channel(每个都可以有一个不同的 ByteBufAllocator 实例)或者绑定到
ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用。
池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。此实现使用了一种称为jemalloc的已被大量现代操作系统所采用的高效方法来分配内存。
该方式在netty中是默认方式。

非池化的分配 UnpooledByteBufAllocator

可能某些情况下,你未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提供了一个简单的称为 Unpooled 的工具类, 它提供了静态的辅助方法来创建未池化的 ByteBuf实例。

依托http进行性能测试

netty http 代码

HttpServer.java
package http.server;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpRequestDecoder;import io.netty.handler.codec.http.HttpResponseEncoder;public class HttpServer { ???private static Log log = LogFactory.getLog(HttpServer.class); ???????public void start(int port) throws Exception { ???????EventLoopGroup bossGroup = new NioEventLoopGroup(); ???????EventLoopGroup workerGroup = new NioEventLoopGroup(); ???????try { ???????????ServerBootstrap b = new ServerBootstrap(); ???????????b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) ???????????????????.childHandler(new ChannelInitializer<SocketChannel>() { ???????????????????????????????@Override ???????????????????????????????public void initChannel(SocketChannel ch) throws Exception { ???????????????????????????????????// server端发送的是httpResponse,所以要使用HttpResponseEncoder进行编码 ???????????????????????????????????ch.pipeline().addLast(new HttpResponseEncoder()); ???????????????????????????????????// server端接收到的是httpRequest,所以要使用HttpRequestDecoder进行解码 ???????????????????????????????????ch.pipeline().addLast(new HttpRequestDecoder()); ???????????????????????????????????ch.pipeline().addLast(new HttpServerInboundHandler()); ???????????????????????????????} ???????????????????????????}).option(ChannelOption.SO_BACKLOG, 128) ????????????????????.childOption(ChannelOption.SO_KEEPALIVE, true); ???????????ChannelFuture f = b.bind(port).sync(); ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????workerGroup.shutdownGracefully(); ???????????bossGroup.shutdownGracefully(); ???????} ???} ???public static void main(String[] args) throws Exception { ???????HttpServer server = new HttpServer(); ???????log.info("Http Server listening on 5656 ..."); ???????server.start(5656); ???}}

HttpServerInboundHandler.java

package http.server;import static io.netty.handler.codec.http.HttpResponseStatus.OK;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.http.DefaultFullHttpResponse;import io.netty.handler.codec.http.FullHttpResponse;import io.netty.handler.codec.http.HttpHeaderNames;import io.netty.handler.codec.http.HttpVersion;public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter { ???private static Log log = LogFactory.getLog(HttpServerInboundHandler.class); ???// private HttpRequest request; ???// static ByteBuf buf = Unpooled.wrappedBuffer("hello world".getBytes()); ???byte[] bs = "hello world".getBytes(); ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????// if (msg instanceof HttpRequest) { ???????// request = (HttpRequest) msg; ???????// ???????// String uri = request.uri(); ???????// // System.out.println("Uri:" + uri); ???????// } ???????// if (msg instanceof HttpContent) { ???????// HttpContent content = (HttpContent) msg; ???????// ByteBuf buf = content.content(); ???????// // System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8)); ???????// buf.release(); ???????// String res = "hello world."; ???????// ByteBuf buf = Unpooled.wrappedBuffer(bs); ???????// ByteBuf buf = Unpooled.directBuffer(); ???????// ByteBuf buf = Unpooled.buffer(); ???????// ByteBuf buf = ctx.alloc().heapBuffer();// 池化堆内存 ???????// ByteBuf buf = ctx.alloc().directBuffer(); // 池化直接内存 ???????// ByteBuf buf = Unpooled.buffer();// 非池化堆内存 ???????ByteBuf buf = Unpooled.directBuffer();// 非池化堆内存 ???????buf.writeBytes(bs); ???????FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK, buf); ???????// response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); ???????response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); ???????/* ????????* if (HttpHeaders.isKeepAlive(request)) { response.headers().set(CONNECTION, Values.KEEP_ALIVE); } ????????*/ ???????ctx.write(response); ???????// ctx.flush(); ???????// } ???} ???@Override ???public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ???????ctx.flush(); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????log.error(cause.getMessage()); ???????ctx.close(); ???}}

池化堆内存 ctx.alloc().heapBuffer()

Running 20s test @ http://127.0.0.1:5656/ ?4 threads and 100 connections ?Thread Stats ??Avg ?????Stdev ????Max ??+/- Stdev ???Latency ????3.76ms ???9.31ms 180.34ms ??92.96% ???Req/Sec ??138.20k ???43.16k ?210.22k ???66.50% ?10957832 requests in 20.09s, 522.51MB readRequests/sec: 545332.08Transfer/sec: ????26.00MBreal ???0m20.104suser ???0m10.441ssys 0m44.703s

池化直接内存 ctx.alloc().directBuffer()

Running 20s test @ http://127.0.0.1:5656/ ?4 threads and 100 connections ?Thread Stats ??Avg ?????Stdev ????Max ??+/- Stdev ???Latency ????4.47ms ???9.99ms 149.70ms ??91.76% ???Req/Sec ??138.51k ???41.31k ?209.94k ???63.38% ?10981466 requests in 20.09s, 523.64MB readRequests/sec: 546684.37Transfer/sec: ????26.07MBreal ???0m20.098suser ???0m10.890ssys 0m45.081s

非池化堆内存 Unpooled.buffer()

Running 20s test @ http://127.0.0.1:5656/ ?4 threads and 100 connections ?Thread Stats ??Avg ?????Stdev ????Max ??+/- Stdev ???Latency ????4.00ms ???8.72ms 150.05ms ??91.52% ???Req/Sec ??138.84k ???42.05k ?209.72k ???63.81% ?11017442 requests in 20.09s, 525.35MB readRequests/sec: 548379.99Transfer/sec: ????26.15MBreal ???0m20.101suser ???0m10.639ssys 0m45.191s

非池化直接内存 Unpooled.directBuffer()

Running 20s test @ http://127.0.0.1:5656/ ?4 threads and 100 connections ?Thread Stats ??Avg ?????Stdev ????Max ??+/- Stdev ???Latency ????3.64ms ???9.36ms 156.79ms ??92.71% ???Req/Sec ??124.55k ???33.90k ?191.90k ???71.61% ?9890536 requests in 20.07s, 471.62MB readRequests/sec: 492854.62Transfer/sec: ????23.50MBreal ???0m20.076suser ???0m9.774ssys 0m41.801s

【netty这点事儿】ByteBuf 的使用模式

原文地址:https://www.cnblogs.com/xxxuwentao/p/8487236.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved