分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 教程案例

vertx的HttpServer模块

发布时间:2023-09-06 02:36责任编辑:蔡小小关键词:暂无标签

Start HttpServer

/** ??* 启动 HttpServer ??* multi instances 采用 synchronized防止线程安全问题 ??* addHandlers 方法是actor模式的实现,1 instances : 1 verticle(actor) : 1 VertxThread ??*/ public synchronized HttpServer listen(int port, String host, Handler<AsyncResult<HttpServer>> listenHandler) { ???//是否有配置requestHandler或webscoket ???if (requestStream.handler() == null && wsStream.handler() == null) { ?????throw new IllegalStateException("Set request or websocket handler first"); ???} ???if (listening) { ?????throw new IllegalStateException("Already listening"); ???} ???listenContext = vertx.getOrCreateContext(); //根据currentThread 获取Context,获取null则create ???serverOrigin = (options.isSsl() ? "https" : "http") + "://" + host + ":" + port;//判断是否启用ssl ???List<HttpVersion> applicationProtocols = options.getAlpnVersions();//获取协议版本,默认支持1.1和2.0 ???????if (listenContext.isWorkerContext()) {//是否使用 Worker Verticles ,不予许使用HTTP2.0 ?????applicationProtocols = applicationProtocols.stream().filter(v -> v != HttpVersion.HTTP_2).collect(Collectors.toList()); ???} ???sslHelper.setApplicationProtocols(applicationProtocols);//应用协议 ???????synchronized (vertx.sharedHttpServers()) {//multi instances 线程安全问题 ?????this.actualPort = port; ??????id = new ServerID(port, host);//生成服务id ?????HttpServerImpl shared = vertx.sharedHttpServers().get(id); ???????????if (shared == null || port == 0) {//判断shard Map , port是否是启动过 instances ???????/** ?????????* frist instances ?????????*/ ???????serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE); ???????ServerBootstrap bootstrap = new ServerBootstrap(); ???????//定义两个线程组,accept size 1, 重写的VertxEventLoopGroup ???????bootstrap.group(vertx.getAcceptorEventLoopGroup(), availableWorkers); ???????????????applyConnectionOptions(bootstrap);//添加Connection Accept之后的附属选项 ???????sslHelper.validate(vertx);//验证ssl相关参数 ???????bootstrap.childHandler(new ChannelInitializer<Channel>() { ?????????????????@Override ?????????/** ???????????* connection accept 调度切换线程后触发 ???????????*/ ?????????protected void initChannel(Channel ch) throws Exception { ????????? //限流策略,读大于写,导致内存无限扩大,最终 OOM ???????????if (requestStream.isPaused() || wsStream.isPaused()) { ?????????????ch.close(); //超过服务承载能力,关闭连接 ?????????????return; ???????????} ???????????ChannelPipeline pipeline = ch.pipeline(); ???????????if (sslHelper.isSSL()) {//是否启用ssl ?????????????io.netty.util.concurrent.Future<Channel> handshakeFuture; ?????????????if (options.isSni()) {//是否启用sni,单服务多证书情况 ???????????????VertxSniHandler sniHandler = new VertxSniHandler(sslHelper, vertx); ???????????????pipeline.addLast(sniHandler); ???????????????handshakeFuture = sniHandler.handshakeFuture(); ?????????????} else { ???????????????SslHandler handler = new SslHandler(sslHelper.createEngine(vertx)); ???????????????pipeline.addLast("ssl", handler); ???????????????handshakeFuture = handler.handshakeFuture(); ?????????????} ?????????????//侦听 TLS handshake ?????????????handshakeFuture.addListener(future -> { ???????????????if (future.isSuccess()) {// 握手成功 ?????????????????if (options.isUseAlpn()) {//是否启用alpn,协调使用的protocol ???????????????????//获取使用的协议 ???????????????????SslHandler sslHandler = pipeline.get(SslHandler.class); ???????????????????String protocol = sslHandler.applicationProtocol(); ???????????????????if ("h2".equals(protocol)) {//是否是http2.0 ?????????????????????handleHttp2(ch); ???????????????????} else { ?????????????????????handleHttp1(ch); ???????????????????} ?????????????????} else { ???????????????????handleHttp1(ch); ?????????????????} ???????????????} else {//握手失败 ?????????????????HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ch.eventLoop()); ?????????????????handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(future.cause())); ???????????????} ?????????????}); ???????????} else { ?????????????//是否是启用http2,通过VM Options: -Dvertx.disableH2c 设置;默认false ?????????????if (DISABLE_H2C) { ???????????????handleHttp1(ch); ?????????????} else { ???????????????IdleStateHandler idle; ???????????????if (options.getIdleTimeout() > 0) {//是否定义最大空闲时间 ?????????????????pipeline.addLast("idle", idle = new IdleStateHandler(0, 0, options.getIdleTimeout())); ???????????????} else { ?????????????????idle = null; ???????????????} ???????????????????????????????/**直接使用明文的http2.0或1.1处理*/ ???????????????pipeline.addLast(new Http1xOrH2CHandler() { ?????????????????@Override ?????????????????protected void configure(ChannelHandlerContext ctx, boolean h2c) { ???????????????????if (idle != null) { ?????????????????????//移除idleHandler,重新添加,不用注意次序 ?????????????????????pipeline.remove(idle); ???????????????????} ???????????????????if (h2c) {//判断协议,如果定义idle则会重新添加 idleHandler ?????????????????????handleHttp2(ctx.channel()); ???????????????????} else { ?????????????????????handleHttp1(ch); ???????????????????} ?????????????????} ?????????????????@Override ?????????????????public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ???????????????????if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.ALL_IDLE) { ?????????????????????ctx.close(); ???????????????????} ?????????????????} ?????????????????@Override ?????????????????public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????????????????super.exceptionCaught(ctx, cause); ???????????????????//根据eventloop选中对应的handler进行异常传播 ???????????????????HandlerHolder<HttpHandlers> handler = httpHandlerMgr.chooseHandler(ctx.channel().eventLoop()); ???????????????????handler.context.executeFromIO(() -> handler.handler.exceptionHandler.handle(cause)); ?????????????????} ???????????????}); ?????????????} ???????????} ?????????} ???????}); ???????addHandlers(this, listenContext);//添加一个httpHandler到httpHandlerMgr中 ???????try { ??????? //listen ip:port ?????????bindFuture = AsyncResolveConnectHelper.doBind(vertx, SocketAddress.inetSocketAddress(port, host), bootstrap); ?????????bindFuture.addListener(res -> { ???????????if (res.failed()) { ?????????????vertx.sharedHttpServers().remove(id); ???????????} else { ?????????????Channel serverChannel = res.result(); ?????????????HttpServerImpl.this.actualPort = ((InetSocketAddress) serverChannel.localAddress()).getPort(); ?????????????serverChannelGroup.add(serverChannel);//添加当前的ServerSocketChannel ?????????????//初始化metrcis指标 ?????????????VertxMetrics metrics = vertx.metricsSPI(); ?????????????this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null; ???????????} ?????????}); ???????} catch (final Throwable t) { ?????????if (listenHandler != null) { ???????????vertx.runOnContext(v -> listenHandler.handle(Future.failedFuture(t))); ?????????} else { ???????????log.error(t); ?????????} ?????????listening = false; ?????????return this; ???????} ???????vertx.sharedHttpServers().put(id, this);//启动的一个instances 添加到 Map中 ???????actualServer = this; ?????} else {//other instances ???????actualServer = shared; ???????this.actualPort = shared.actualPort; ???????addHandlers(actualServer, listenContext);//添加一个httpHandler到httpHandlerMgr中 ???????//初始化metrics ???????VertxMetrics metrics = vertx.metricsSPI(); ???????this.metrics = metrics != null ? metrics.createMetrics(this, new SocketAddressImpl(port, host), options) : null; ?????} ?????//服务 bind 状态 ?????actualServer.bindFuture.addListener(future -> { ???????if (listenHandler != null) { ?????????final AsyncResult<HttpServer> res; ?????????if (future.succeeded()) { ???????????res = Future.succeededFuture(HttpServerImpl.this); ?????????} else { ???????????res = Future.failedFuture(future.cause()); ???????????listening = false; ?????????} ?????????listenContext.runOnContext((v) -> listenHandler.handle(res));//回调处理 ???????} else if (future.failed()) { ?????????listening = false; ?????????log.error(future.cause()); ???????} ?????}); ???} ???return this;}

如何实现隔离(actor模型)

 private void addHandlers(HttpServerImpl server, ContextImpl context) { ???????server.httpHandlerMgr.addHandler( ?????????new HttpHandlers( ???????????requestStream.handler(), ???????????wsStream.handler(), ???????????connectionHandler, ???????????exceptionHandler == null ? DEFAULT_EXCEPTION_HANDLER : exceptionHandler) ?????????, context); } ???????????public class HttpHandlers { ?????final Handler<HttpServerRequest> requestHandler; ?????final Handler<ServerWebSocket> wsHandler; ?????final Handler<HttpConnection> connectionHandler; ?????final Handler<Throwable> exceptionHandler; ?????????/** ???????* @param requestHandler ????Http Request Handler ???????* @param wsHandler ?????????WebScoket Handler ???????* @param connectionHander ??TCP Connection Handler ???????* @param exceptionHander ???Exception Handlet ???????*/ ?????public HttpHandlers( ???????Handler<HttpServerRequest> requestHandler, ???????Handler<ServerWebSocket> wsHandler, ???????Handler<HttpConnection> connectionHandler, ???????Handler<Throwable> exceptionHandler) { ???????this.requestHandler = requestHandler; ???????this.wsHandler = wsHandler; ???????this.connectionHandler = connectionHandler; ???????this.exceptionHandler = exceptionHandler; ?????} ???} ???????public class HandlerManager<T> { ???????public synchronized void addHandler(T handler, ContextImpl context) { ???????????/** ?????????????* 添加一个eventloop(Thread)到 VertxEventLoopGroup 集合中. ?????????????* accept状态后的read/write事件,线程调度在VertxEventLoopGroup类的next方法, ?????????????* vertx重写choose策略 ?????????????*/ ???????????EventLoop worker = context.nettyEventLoop(); ???????????availableWorkers.addWorker(worker); ???????????/** ?????????????* 添加handlers,并且绑定handler和context映射关系. ?????????????* 注意部署的instances size不要超过EventLoopPoolSize, ?????????????* 否则出现 1 context : N handler(verticle) ?????????????*/ ???????????Handlers<T> handlers = new Handlers<>(); ???????????Handlers<T> prev = handlerMap.putIfAbsent(worker, handlers); ???????????if (prev != null) { ?????????????handlers = prev; ???????????} ???????????handlers.addHandler(new HandlerHolder<>(context, handler)); ???????????hasHandlers = true; ???????} ???}

Connection scheduling process:

add handler to eventloop structure:

  1. an eventloop corresponds to a handlers
  2. an eventloop corresponds to multiple verticles(HandlerHolder)

vertx的HttpServer模块

原文地址:https://www.cnblogs.com/cmfa/p/10594932.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved