分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 网页技术

Apache Ignite 改装(一) -- ?服务异步化支持

发布时间:2023-09-06 01:49责任编辑:白小东关键词:暂无标签
本文假设读者了解Apache Ignite,阅读过ignite service grid的官方文档,或使用过ignite的service grid,本文同样假设读者了解 java的CompletionStage的相关用法。本文涉及的ignite版本为2.4.0。

使用Apache Ignite的Service grid作为微服务开发框架, 通常是如下定义和实现Service的:

服务接口:
public interface MyService { ???public String sayHello(String to);}

本文将实现如下样式的Service,使其异步化:

异步化的服务接口:
public interface MyServiceAsync { ???public CompletionStage<String> sayHello(String to);}

当前ignite对上边这样的异步的service方法并没有remote支持。当调用端与服务部署再同一节点时,ignite会发起一个本地方法调用,这样是没有问题的,但是当服务部署端与调用端在不同节点时,ignite通过发起一个distributed task,将调用通过消息方式发布到服务部署节点,由于服务实现是异步的,通常来说,会返回一个未完成状态的CompletionStage,后续当真正complete的时候,调用端的CompletionStage并不会被notify,即调用端永远无法得到真正的调用结果。
为了能够支持CompletionStage的远程状态专递,我们需要对ignite进行如下改动:

org/apache/ignite/internal/processors/service/GridServiceProxy.java
...// line 192if(CompletionStage.class.isAssignableFrom(mtd.getReturnType())) { ???CompletableFuture<Object> cs = new CompletableFuture<>(); ??????????????????//call async and notify completion stage ???ctx.closure().callAsyncNoFailover( ???????GridClosureCallMode.BROADCAST, ???????new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args), ???????????Collections.singleton(node), ???????????false, ???????????waitTimeout, ???????????true).listen(f -> { ???????????????if(f.error() != null) { ???????????????????cs.completeExceptionally(f.error()); ???????????????}else if(f.isCancelled()) { ???????????????????cs.cancel(false); ???????????????} ???????????????if(f.isDone()) { ???????????????????try { ???????????????????????Object result = f.get(); ???????????????????????if(result != null && IgniteException.class.isAssignableFrom(result.getClass())) { ???????????????????????????cs.completeExceptionally((IgniteException)result); ???????????????????????}else { ???????????????????????????cs.complete(f.get()); ???????????????????????} ???????????????????} catch (IgniteCheckedException e) { ???????????????????????cs.completeExceptionally(e); ???????????????????} ???????????????} ???????????}); ???return cs;}...

这段代码做了如下的事情:检测当服务方法返回值是一个CompletionStage的时候,则创建一个CompletableFuture作为代理对象返回给调用端。随后监听服务的远程调用的结果,并且用这个结果来更新这个CompletableFuture。到这里,调用端的service proxy的改造就完成了。接下来,我们还需要改造服务节点这一端:

org/apache/ignite/internal/processors/job/GridJobWorker.java(line 618起的这个finally块),改造前:
finally { ???// Finish here only if not held by this thread. ?if (!HOLD.get()) ???????finishJob(res, ex, sndRes); ???else ???// Make sure flag is not set for current thread. ???// This may happen in case of nested internal task call with continuation. ???HOLD.set(false); ???ctx.job().currentTaskSession(null); ???if (reqTopVer != null) ???????GridQueryProcessor.setRequestAffinityTopologyVersion(null); ???}}
改造后:
finally { ???if(res != null && CompletionStage.class.isAssignableFrom(res.getClass())) { ???????final boolean sendResult = sndRes; ???????final IgniteException igException = ex; ???????@SuppressWarnings("unchecked") ???????CompletionStage<Object> cs = (CompletionStage<Object>)res; ???????cs.exceptionally(t->{ ???????????return new IgniteException(t); ???????}).thenAccept(r->{ ???????????if (!HOLD.get()) { ???????????????IgniteException e = igException; ???????????????finishJob(r, e, sendResult); ???????????} else ???????????// Make sure flag is not set for current thread. ???????????// This may happen in case of nested internal task call with continuation. ???????????????HOLD.set(false); ???????????ctx.job().currentTaskSession(null); ???????????if (reqTopVer != null) ???????????????GridQueryProcessor.setRequestAffinityTopologyVersion(null); ???????}); ???} else { ???????// Finish here only if not held by this thread. ???????if (!HOLD.get()) ???????????finishJob(res, ex, sndRes); ???????else ???????// Make sure flag is not set for current thread. ???????// This may happen in case of nested internal task call with continuation. ???????????HOLD.set(false); ???????ctx.job().currentTaskSession(null); ???????if (reqTopVer != null) ???????????GridQueryProcessor.setRequestAffinityTopologyVersion(null); ???}}

这里做的事情是:当在服务部署节点上拿到执行结果的时候,如果发现服务返回结果是一个CompletionStage,那么处理这个CompletionStage的exceptionally和thenAccept, 把结果发送给remote的调用端。

就这样,通过简单的改装,我们使ignite有了处理异步服务方法调用的能力。下边我们实现一个服务来看看改装结果:

服务定义与实现:
import java.util.concurrent.CompletionStage;import org.apache.ignite.services.Service;public interface MyService extends Service { ???public CompletionStage<String> sayHelloAsync(String to); ???public String sayHelloSync(String to);}
import java.util.concurrent.CompletionStage;public class MyServiceImpl implements MyService { ???private ScheduledExecutorService es; ???@Override public void init(ServiceContext ctx) throws Exception { ???????es = Executors.newSingleThreadScheduledExecutor(); ???} ???@Override public CompletionStage<String> sayHelloAsync(String to){ ?????CompletableFuture<String> ret = new CompletableFuture<>(); ???????//return "async hello $to" after 3 secs ???????es.schedule(()->ret.complete("async hello " + to), 3, TimeUnit.SECONDS); ???????return ret; ???} ???@Override public String sayHelloSync(String to){ ???????return "sync hello " + to; ???} ???...}

然后将服务部署在Service grid中:

...ServiceConfiguration sConf = new ServiceConfiguration();sConf.setName("myservice.version.1");sConf.setService(new MyServiceImpl());sConf.setMaxPerNodeCount(2);sConf.setTotalCount(4);ignite.services().deploy(sConf);...

然后启动一个客户端节点进行服务调用:

MyService service = ignite.services().serviceProxy("myservice.version.1", ?MyService.class, false);//test async serviceservice.sayHelloAsync("nathan").thenAccept(r->{ ???System.out.println(r);});//test sync serviceSystem.out.println(service.sayHelloSync("nathan"));...

输出结果:

sync hello nathanasync hello nathan

可以看到先输出了sync的结果,大约3秒后输出async的结果。

Apache Ignite 改装(一) -- ?服务异步化支持

原文地址:http://blog.51cto.com/12274728/2097094

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved