分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

(3)Reactive stream 响应式流——Webflux响应式编程利器

发布时间:2023-09-06 02:31责任编辑:蔡小小关键词:WebReact

Reactive stream 响应式流

  • Reactive stream是jdk9新特性,提供了一套API,就是一种订阅发布者模式
  • 被压,背压是指在异步场景中,发布者发送事件速度远快于订阅者的处理速度的情况下,一种告诉上游的发布者降低发送速度的策略,简而言之,背压就是一种流速控制的策略。
    举个例子:假设以前是没有水龙头的,只能自来水厂主动的往用户输送水,但是不知道用户需要多少水,有了Reactive stream,就相当于有了水龙头,用户可以主动的请求用水,而自来水厂也知道了用户的需求
    示例代码(需要jdk9以上版本的支持)
import java.util.concurrent.Flow.Subscriber;import java.util.concurrent.Flow.Subscription;import java.util.concurrent.SubmissionPublisher;public class FlowDemo { ???public static void main(String[] args) throws Exception { ???????// 1. 定义发布者, 发布的数据类型是 Integer ???????// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口 ???????SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); ???????????????// 2. 定义订阅者 ???????Subscriber<Integer> subscriber = new Subscriber<Integer>() { ???????????private Subscription subscription; ???????????@Override ???????????public void onSubscribe(Subscription subscription) { ???????????????// 保存订阅关系, 需要用它来给发布者响应 ???????????????this.subscription = subscription; ???????????????// 请求一个数据 ???????????????this.subscription.request(1); ???????????} ???????????@Override ???????????public void onNext(Integer item) { ???????????????// 接受到一个数据, 处理 ???????????????System.out.println("接受到数据: " + item); ???????????????try { ???????????????????TimeUnit.SECONDS.sleep(3); ???????????????} catch (InterruptedException e) { ???????????????????e.printStackTrace(); ???????????????} ???????????????// 处理完调用request再请求一个数据 ???????????????this.subscription.request(1); ???????????????// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了 ???????????????// this.subscription.cancel(); ???????????} ???????????@Override ???????????public void onError(Throwable throwable) { ???????????????// 出现了异常(例如处理数据的时候产生了异常) ???????????????throwable.printStackTrace(); ???????????????// 我们可以告诉发布者, 后面不接受数据了 ???????????????this.subscription.cancel(); ???????????} ???????????@Override ???????????public void onComplete() { ???????????????// 全部数据处理完了(发布者关闭了) ???????????????System.out.println("处理完了!"); ???????????} ???????}; ???????// 3. 发布者和订阅者 建立订阅关系 ???????publiser.subscribe(subscriber); ???????// 4. 生产数据, 并发布 ???????// 这里忽略数据生产过程 ???????for (int i = 0; i < 1000; i++) { ???????????System.out.println("生成数据:" + i); ???????????// submit是个block方法 ???????????publiser.submit(i); ???????} ???????publiser.submit(111); ???????publiser.submit(222); ???????publiser.submit(333); ???????// 5. 结束后 关闭发布者 ???????// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭 ???????publiser.close(); ???????// 主线程延迟停止, 否则数据没有消费就退出 ???????Thread.currentThread().join(1000); ???}}

(3)Reactive stream 响应式流——Webflux响应式编程利器

原文地址:https://www.cnblogs.com/algerfan/p/10305130.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved