分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 教程案例

JStorm之Topology调度

发布时间:2023-09-06 01:42责任编辑:赖小花关键词:暂无标签
???????????????????????????
???????????????????????? topology在服务端提交过程中,会经过一系列的验证和初始化:TP结构校验、创建本地文件夹并拷贝序列化文件jar包、生成znode用于存放TP和task等信息,最后一步才进行任务分配。例如以下图:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbGlobTBfMQ==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" ?/>
提交主函数位于ServiceHandler.java中

private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException {//1、创建topology的分配事件TopologyAssignEvent assignEvent = new TopologyAssignEvent();assignEvent.setTopologyId(topologyId);assignEvent.setScratch(false);assignEvent.setTopologyName(topologyName);assignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(status)); ?//2、丢入事件处理队列TopologyAssign.push(assignEvent); ?//3、等待时间返回boolean isSuccess = assignEvent.waitFinish();if (isSuccess == true) {LOG.info("Finish submit for " + topologyName);} else {throw new FailedAssignTopologyException(assignEvent.getErrorMsg());}}

这当中最基本的是事件丢入队列后兴许的处理过程。事件分配由TopologyAssign线程处理,这个线程的流程非常清晰,监听事件队列。一旦有事件进入,立即取出,进行doTopologyAssignment,例如以下:
public void run() {LOG.info("TopologyAssign thread has been started");runFlag = true;while (runFlag) {TopologyAssignEvent event;try {event = queue.take();} catch (InterruptedException e1) {continue;}if (event == null) {continue;}boolean isSuccess = doTopologyAssignment(event);..............}

任务分配的核心代码位于TopologyAssign.java中
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {String topologyId = event.getTopologyId();LOG.info("Determining assignment for " + topologyId);TopologyAssignContext context = prepareTopologyAssign(event);Set<ResourceWorkerSlot> assignments = null;if (!StormConfig.local_mode(nimbusData.getConf())) {IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);//開始进行作业的调度assignments = scheduler.assignTasks(context);} else {assignments = mkLocalAssignment(context);}............}

调用栈例如以下:

分配原理是首先获得全部可用的supervisor,推断supervisor可用的标准是是否有空暇的slot,也就是是否全部supervisor.slots.ports指定port都被占用,然后计算出须要分配几个woker。由于一个woker相应一个port,当然这些信息的採集都是来自Zookeeper,如今我们来分析分配的核心代码:
WorkerMaker.java
//注意參数,result是这个作业须要的槽位。传入前仅仅知道须要槽位的数量,详细分配到哪台supervisor上还没指定
//supervisors指当前集群中全部可用的supervisor。即有空暇port的
private void putWorkerToSupervisor(List<ResourceWorkerSlot> result,List<SupervisorInfo> supervisors) {int key = 0;//按所需槽位遍历,每次分配一个for (ResourceWorkerSlot worker : result) {//首先进行必要的推断和置位if (supervisors.size() == 0)return;if (worker.getNodeId() != null)continue;if (key >= supervisors.size())key = 0;//1、取出第一个supervisorSupervisorInfo supervisor = supervisors.get(key);worker.setHostname(supervisor.getHostName());worker.setNodeId(supervisor.getSupervisorId());worker.setPort(supervisor.getWorkerPorts().iterator().next());//槽位用完则从集合中删除,不再參与分配supervisor.getWorkerPorts().remove(worker.getPort());if (supervisor.getWorkerPorts().size() == 0)supervisors.remove(supervisor);//当一个supervisor分配完后便不再使用。除非supervisor不够用key++;}}

从上面的代码中我们能够看到,眼下槽位分配没考虑机器负载,槽位的分配并不一定平均,比方第一个supervisor有10个槽位,剩下的supervisor仅仅有两个,那么还是要每一个supervisor分配一个woker的。

注意一个问题,在上面代码中supervisors这个集合是经过排序的,排序规则例如以下:

private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> result,List<SupervisorInfo> supervisors) {...........supervisors = this.getCanUseSupervisors(supervisors);Collections.sort(supervisors, new Comparator<SupervisorInfo>() {@Overridepublic int compare(SupervisorInfo o1, SupervisorInfo o2) {// TODO Auto-generated method stubreturn -NumberUtils.compare(o1.getWorkerPorts().size(), o2.getWorkerPorts().size());}});this.putWorkerToSupervisor(result, supervisors);.............}
能够看到。当前排序规则是按slot多少的,我们兴许版本号中可能会考虑机器负载的一些因素吧。

???????????????
???????????????????
???

JStorm之Topology调度

原文地址:https://www.cnblogs.com/llguanli/p/8438798.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved