分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 教程案例

Http 调用netty 服务,服务调用客户端,伪同步响应.ProtoBuf 解决粘包,半包问题.

发布时间:2023-09-06 01:49责任编辑:彭小芳关键词:暂无标签

实际情况是: 公司需要开发一个接口给新产品使用,需求如下

1.有一款硬件设备,客户用usb接上电脑就可以,但是此设备功能比较单一,所以开发一个服务器程序,辅助此设备业务功能

2.解决方案,使用Socket调用此设备

3.增强此设备功能,增加Socket客户端连接到Socket服务端

4.Http请求,同步响应

测试注意:

1.nettyServer 在ubuntu下编码,使用Epoll

2.Http请求的测试最好运行再Linux 下进行,因为Windows 可能会因为并发高的时候占满端口限制,HttpClient或者RestTemplate 请求不了.

3.ProtoBuf 插件无论再Windows,还是linux同样适用,在linux 下,会自动下载 protoc-3.5.1-linux-x86_64.exe

简单的流程如下

解决方案:

1.使用Netty框架

2.使用ProtoBuf,配合Netty 对ProtoBuf解决半包问题

3.Future 实现伪同步响应

4.SpringBoot + jetty

pom.xml 添加ProtoBuf依赖以及插件

 <properties> ???????<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> ???????<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> ???????<java.version>1.8</java.version> ???????<grpc.version>1.11.0</grpc.version> ???????<protobuf.version>3.5.1</protobuf.version> ???</properties>
 ?<dependency> ???????????<groupId>com.google.protobuf</groupId> ???????????<artifactId>protobuf-java</artifactId> ???????????<version>${protobuf.version}</version> ???????</dependency> ???????<dependency> ???????????<groupId>io.grpc</groupId> ???????????<artifactId>grpc-netty</artifactId> ???????????<version>${grpc.version}</version> ???????????<scope>provided</scope> ???????</dependency> ???????<dependency> ???????????<groupId>io.grpc</groupId> ???????????<artifactId>grpc-protobuf</artifactId> ???????????<version>${grpc.version}</version> ???????????<scope>provided</scope> ???????</dependency> ???????<dependency> ???????????<groupId>io.grpc</groupId> ???????????<artifactId>grpc-stub</artifactId> ???????????<version>${grpc.version}</version> ???????????<scope>provided</scope> ???????</dependency> ???????<dependency> ???????????<groupId>com.googlecode.protobuf-java-format</groupId> ???????????<artifactId>protobuf-java-format</artifactId> ???????????<version>1.4</version> ???????</dependency>

插件

 ???<build> ???????<extensions> ???????????<extension> ???????????????<groupId>kr.motd.maven</groupId> ???????????????<artifactId>os-maven-plugin</artifactId> ???????????????<version>1.5.0.Final</version> ???????????</extension> ???????</extensions> ???????<plugins> ???????????<plugin> ???????????????<groupId>org.apache.maven.plugins</groupId> ???????????????<artifactId>maven-compiler-plugin</artifactId> ???????????????<configuration> ???????????????????<source>1.8</source> ???????????????????<target>1.8</target> ???????????????</configuration> ???????????</plugin> ???????????<plugin> ???????????????<groupId>org.xolstice.maven.plugins</groupId> ???????????????<artifactId>protobuf-maven-plugin</artifactId> ???????????????<version>0.5.1</version> ???????????????<configuration> ???????????????????<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> ???????????????????<pluginId>grpc-java</pluginId> ???????????????????<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}}:exe:${os.detected.classifier}</pluginArtifact> ???????????????</configuration> ???????????????<executions> ???????????????????<execution> ???????????????????????<goals> ???????????????????????????<goal>compile</goal> ???????????????????????????<goal>compile-custom</goal> ???????????????????????</goals> ???????????????????</execution> ???????????????</executions> ???????????</plugin> ???????</plugins> ???</build>

 屏蔽Tomcat 使用 Jetty

 ???????<dependency> ???????????<groupId>org.springframework.boot</groupId> ???????????<artifactId>spring-boot-starter-web</artifactId> ???????????<exclusions> ???????????????<exclusion> ???????????????????<groupId>org.springframework.boot</groupId> ???????????????????<artifactId>spring-boot-starter-tomcat</artifactId> ???????????????</exclusion> ???????????</exclusions> ???????</dependency> ???????<dependency> ???????????<groupId>org.springframework.boot</groupId> ???????????<artifactId>spring-boot-starter-jetty</artifactId> ???????</dependency>

编写proto:再/src/main中创建文件夹 proto,创建一个Message.proto

文件内容

syntax = "proto3";option java_package = "com.lzw.netty";option java_outer_classname = "MessageProto";message Message { ???int32 type = 1; ???sfixed64 id = 2; ???string msgBody = 3; ???enum Type { ???????ACTIVE = 0; ???????MESSAGE = 1; ???}}

生成java 文件

文件目录,挪到自己需要的包下面

服务端代码

/** * User: laizhenwei * Date: 2018-03-26 Time: 21:46 * Description: */public class EchoServer { ???//缓存ResponseFuture ???public static Map<Long, ResponseFuture<MessageProto.Message>> responseFutureMap = new HashMap<>(); ???private final int port; ???public EchoServer(int port) { ???????this.port = port; ???} ???public void start() throws InterruptedException { ???????EventLoopGroup bossGroup = new EpollEventLoopGroup(1); ???????EventLoopGroup workerGroup = new EpollEventLoopGroup(); ???????ServerBootstrap bootstrap = new ServerBootstrap(); ???????bootstrap.group(bossGroup,workerGroup).channel(EpollServerSocketChannel.class) ???????????????.localAddress(new InetSocketAddress(port)) ???????????????.childHandler(new MyServerChannelInitializer()); ???????try { ???????????ChannelFuture f = bootstrap.bind().sync(); ???????????//清理不可预知而失败的脏数据 ???????????f.channel().eventLoop().scheduleAtFixedRate(() -> { ???????????????long nowTime = System.currentTimeMillis(); ???????????????responseFutureMap.entrySet().stream().filter(e -> (nowTime - e.getValue().getBeginTime()) > 60000).map(e -> e.getKey()).forEach(k->responseFutureMap.remove(k)); ???????????}, 300, 300, TimeUnit.SECONDS); ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????bossGroup.shutdownGracefully().sync(); ???????????workerGroup.shutdownGracefully().sync(); ???????} ???}}

ContextHelper缓存ChannelHandlerContext

/** * User: laizhenwei * Date: 2018-03-26 Time: 21:46 * Description: 缓存客户端的ChannelHandlerContext */public class ContextHelper { ???private final static Map<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<>(); ???public static Map<String, ChannelHandlerContext> getClientMap() { ???????return Collections.unmodifiableMap(clientMap); ???} ???public static ChannelHandlerContext get(String id){ ???????return clientMap.get(id); ???} ???public static void add(String id, ChannelHandlerContext ctx) { ???????clientMap.put(id, ctx); ???} ???public static void remove(String id) { ???????clientMap.remove(id); ???}}

MyServerHandler

/** * User: laizhenwei * Date: 2018-03-26 Time: 21:46 * Description: */@Slf4j@ChannelHandler.Sharablepublic class MyServerHandler extends SimpleChannelInboundHandler<MessageProto.Message> { ???@Override ???protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProto.Message msg) { ???????String message = msg.getMsgBody(); ???????if ((MessageProto.Message.Type.ACTIVE_VALUE) == msg.getType()) { ???????????Attribute<String> attribute = channelHandlerContext.channel().attr(AttributeKey.valueOf("userName")); ???????????//连接上以后获取消息参数,设置到channelAttr ???????????String userName = message.split(":")[1]; ???????????attribute.setIfAbsent(userName); ???????????//缓存channelHandlerContext ???????????ContextHelper.add(userName, channelHandlerContext); ???????} else if (MessageProto.Message.Type.MESSAGE_VALUE == msg.getType()) { ???????????ResponseFuture<MessageProto.Message> resutl = EchoServer.responseFutureMap.get(msg.getId()); ???????????if (resutl == null) ???????????????log.warn("result is null ! msgId:" + msg.getId()); ???????????MessageProto.Message message1 = MessageProto.Message.newBuilder().setId(msg.getId()).setType(MessageProto.Message.Type.MESSAGE_VALUE).setMsgBody("接收成功!msg:" + message).build(); ???????????resutl.setResponse(message1); ???????}// ???????System.out.println("Client->Server:" + channelHandlerContext.channel().remoteAddress() + " send " + msg.getMsgBody()); ???} ???@Override ???public void channelInactive(ChannelHandlerContext ctx){ ???????Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf("userName")); ???????ContextHelper.remove(attribute.get()); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????cause.printStackTrace(); ???}}

ChannelInitializer,添加 Netty 支持 ProtoBuf 的拆包处理,以及编码解码

/** * User: laizhenwei * Date: 2018-03-26 Time: 21:46 * Description: */public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> { ???@Override ???protected void initChannel(SocketChannel socketChannel) throws Exception { ???????socketChannel.pipeline() ???????????????.addLast(new ProtobufVarint32FrameDecoder()) ???????????????.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance())) ???????????????.addLast(new ProtobufVarint32LengthFieldPrepender()) ???????????????.addLast(new ProtobufEncoder()) ???????????????.addLast(new MyServerHandler()); ???}}

ResponseFuture

@NoArgsConstructorpublic class ResponseFuture<T> implements Future<T> { ???// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。 ???private CountDownLatch latch = new CountDownLatch(1); ???// 响应结果 ???private T response; ???// Futrue的请求时间,用于计算Future是否超时 ???private long beginTime = System.currentTimeMillis(); ???@Override ???public boolean cancel(boolean mayInterruptIfRunning) { ???????return false; ???} ???@Override ???public boolean isCancelled() { ???????return false; ???} ???@Override ???public boolean isDone() { ???????if (response != null) ???????????return true; ???????return false; ???} ???// 获取响应结果,直到有结果才返回。 ???@Override ???public T get() throws InterruptedException { ???????latch.await(); ???????return this.response; ???} ???// 获取响应结果,直到有结果或者超过指定时间就返回。 ???@Override ???public T get(long timeout, TimeUnit unit) throws InterruptedException { ???????if (latch.await(timeout, unit)) ???????????return this.response; ???????return null; ???} ???// 用于设置响应结果,并且做countDown操作,通知请求线程 ???public void setResponse(T response) { ???????this.response = response; ???????latch.countDown(); ???} ???public long getBeginTime() { ???????return beginTime; ???}}

ApplicationStartup SpringBoot 完全启动以后,运行Netty服务

/** * User: laizhenwei * Date: 2018-03-26 Time: 21:46 * Description: */@Componentpublic class ApplicationStartup implements CommandLineRunner { ???@Override ???public void run(String... args) throws Exception { ???????new EchoServer(5000).start(); ???}}

客户端 EchoClient

/** * User: laizhenwei * Date: 2018-03-27 Time: 21:50 * Description: */public class EchoClient { ???private final String host; ???private final int port; ???public EchoClient(String host,int port){ ???????this.host = host; ???????this.port = port; ???} ???public void start(String userName) throws InterruptedException { ???????EventLoopGroup group = new NioEventLoopGroup(); ???????Bootstrap b = new Bootstrap(); ???????b.group(group).channel(NioSocketChannel.class) ???????????????.remoteAddress(new InetSocketAddress(host,port)) ???????????????.handler(new ChannelInitializer<SocketChannel>(){ ???????????????????@Override ???????????????????protected void initChannel(SocketChannel socketChannel){ ???????????????????????socketChannel.attr(AttributeKey.valueOf("userName")).setIfAbsent(userName); ???????????????????????socketChannel.pipeline() ???????????????????????????????.addLast(new ProtobufVarint32FrameDecoder()) ???????????????????????????????.addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance())) ???????????????????????????????.addLast(new ProtobufVarint32LengthFieldPrepender()) ???????????????????????????????.addLast(new ProtobufEncoder()) ???????????????????????????????.addLast(new MyClientHandler()); ???????????????????} ???????????????}); ??????try { ??????????ChannelFuture f = b.connect().sync(); ??????????f.channel().closeFuture().sync(); ??????}finally { ??????????group.shutdownGracefully().sync(); ??????} ???} ???public static void main(String[] args){ ???????threadRun("Athos"); ???????threadRun("Nero"); ???????threadRun("Dante"); ???????threadRun("Vergil"); ???????threadRun("lzw"); ???????threadRun("Churchill"); ???????threadRun("Peter"); ???????threadRun("Bob"); ???} ???private static void threadRun(String userName){ ???????new Thread(()-> { ???????????try { ???????????????new EchoClient("192.168.1.8",5000).start(userName); ???????????} catch (InterruptedException e) { ???????????????e.printStackTrace(); ???????????} ???????}).start(); ???}}

 MyClientHandler

/** * User: laizhenwei * Date: 2018-04-09 Time: 11:20 * Description: */@ChannelHandler.Sharablepublic class MyClientHandler extends SimpleChannelInboundHandler<MessageProto.Message> { ???@Override ???public void channelActive(ChannelHandlerContext ctx) { ???????Attribute<Object> attribute = ctx.channel().attr(AttributeKey.valueOf("userName")); ???????String m = "userName:" + attribute.get(); ???????MessageProto.Message.Builder builder = MessageProto.Message.newBuilder(); ???????builder.setType(MessageProto.Message.Type.ACTIVE_VALUE).setMsgBody(m); ???????ctx.writeAndFlush(builder.build()); ???} ???@Override ???protected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProto.Message msg) { ???????MessageProto.Message.Builder builder = MessageProto.Message.newBuilder(); ???????//把接收到的消息写回到服务端 ???????builder.setId(msg.getId()).setType(MessageProto.Message.Type.MESSAGE_VALUE).setMsgBody(msg.getMsgBody()); ???????channelHandlerContext.channel().writeAndFlush(builder.build()); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????cause.printStackTrace(); ???????ctx.close(); ???}}

JunitTest

 ???@Test ???public void testRest() throws InterruptedException { ???????final Gson gson = new Gson(); ???????AtomicLong atomicLong = new AtomicLong(0); ???????ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ???????executor.setCorePoolSize(50); ???????executor.setMaxPoolSize(50); ???????executor.setQueueCapacity(512); ???????executor.setThreadNamePrefix("Executor-"); ???????executor.setAllowCoreThreadTimeOut(false); ???????executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); ???????executor.setWaitForTasksToCompleteOnShutdown(true); ???????executor.initialize(); ???????String[] userNames = {"Athos", "Nero", "Dante" ???????????????, "Vergil", "lzw", "Churchill" ???????????????, "Peter", "Bob"};// ???????String[] userNames = {"Athos"}; ???????RestTemplate restTemplate = new RestTemplate(); ???????HttpHeaders httpHeaders = new HttpHeaders(); ???????httpHeaders.setAccept(Arrays.asList(MediaType.APPLICATION_JSON_UTF8)); ???????httpHeaders.add("connection", "keep-alive");// ???????httpHeaders.setConnection("close"); ???????List<CompletableFuture<Boolean>> futures = new ArrayList<>(); ???????long begin = System.nanoTime(); ???????Arrays.stream(userNames).forEach(userName -> new Thread(() -> { ???????????for (int i = 0; i < 100000; i++) { ???????????????futures.add(CompletableFuture.supplyAsync(() -> { ???????????????????long currentId = atomicLong.getAndIncrement(); ???????????????????MultiValueMap<String, String> params = new LinkedMultiValueMap<>(); ???????????????????params.add("userName", userName); ???????????????????params.add("msg", "你好啊!" + currentId); ???????????????????HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, httpHeaders); ???????????????????String response = restTemplate.postForObject("http://192.168.91.130:8010/process", httpEntity, String.class); ???????????????????if (response != null) { ???????????????????????Map<String, Object> responseMap; ???????????????????????responseMap = gson.fromJson(response, HashMap.class); ???????????????????????return responseMap.get("msgBody").equals("接收成功!msg:你好啊!" + currentId); ???????????????????} ???????????????????return false; ???????????????}, executor)); ???????????} ???????}).start()); ???????while(futures.size()!=(100000*userNames.length)){ ???????????TimeUnit.MILLISECONDS.sleep(500); ???????} ???????List<Boolean> result = futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); ???????System.out.println((System.nanoTime() - begin) / 1000000); ???????result.stream().filter(r -> !r).forEach(r -> System.out.println(r)); ???}

1.启动NettyServer

2.启动NettyClient

3.启动N个JunitTest windows 启动5个,Linux 启动5个

看看server输出,从请求到响应非常迅速

Client 多个线程也没有看到输出有false,证明伪同步响应成功

Http 调用netty 服务,服务调用客户端,伪同步响应.ProtoBuf 解决粘包,半包问题.

原文地址:https://www.cnblogs.com/sweetchildomine/p/8798493.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved