分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > IT知识

pinpoint web报警机制源码解析

发布时间:2023-09-06 02:10责任编辑:苏小强关键词:暂无标签

背景:(简述)

Pinpoint 是一套APM (Application Performance Management)工具,主要用于帮助分析系统的总体结构和组件如何相互调用,也可用于追踪线上性能问题,方便定位出现问题的点。

Pinpoint主要有如下几个组成部分:

  • Pinpoint Agent :通过字节码增强技术,附加到 用户的java 应用来做采样,程序启动时指定javaagent以及agentId,pplicationName。
  • HBase :用于存储agent采样的数据。
  • Pinpoint Collector :信息的收集者,部署在tomcat中,由于收集agent采样的数据并存入hbase。
  • Pinpoint Web :提供WEB_UI界面,部署在tomcat中,提供可视化页面,并且提供监控报警功能。(需要自行实现)

博文的主要内容与主要目的:

  pinpoint的报警功能是需要自行拓展实现的,网上有很多的实现方法,但是没有一个对于此报警机制的源码分析,本文旨在填补此处空白,让读者有一些基本的了解,如此调试报警的时候才能够得心应手。

背景知识:(详情可以百度一下,后面也会介绍相关内容)

  Spring Task:Spring内置的定时任务。

  Spring Batch: 一个大数据量的并行处理框架。

概述:

   通过Spring Task的定时任务,每分钟做一次通过SpringBatch的批处理检查。

该模块github源码地址:

  https://github.com/naver/pinpoint/tree/master/web/src/main/java/com/navercorp/pinpoint/web/alarm

源码解析:(本文重点)

  1.定时任务入口: src/main/resources/batch/applicationContext-batch-schedule.xml

<task:scheduled-tasks scheduler="scheduler"> ??????<task:scheduled ref="batchJobLauncher" method="alarmJob" cron="0 0/1 * * * *" /> ??????<!--省略--></task:scheduled-tasks><task:scheduler id="scheduler" pool-size="5"/>

此段代码标签使用的是SpringTask的标签,大概意思为,定义了一个线程池大小为5的调度器scheduler。

scheduler执行的任务有三个,batchJobLauncher的alarmJob方法,每一分钟执行一次,alarmJob,顾名思义,报警的任务。

  2.批处理任务入口: src/main/resources/batch/applicationContext-alarmJob.xml

前置说明-SpringBatch批处理框架中与此相关的解释:

如果是批处理的话,自然有批处理任务(对应下面的Job标签),每个任务自然有一个或者多个步骤(对应下面的Step标签)。

每个步骤有三个操作,读取数据(对应reader),处理数据(对应processor),回写数据(对应writer)。这三者中的参数是按照顺序传递的。

大家可能会想,报警机制和读取数据,处理数据,回写数据有什么关系吗?下面我说明一下pinpoint相关的对应的业务关系:

reader:读取数据 => 通过用户配置的规则提供Checker,即异常校验器。

processor:处理数据 => 用Checker进行校验,标记异常状态。

writer:回写数据 => 判断Checker是否有异常情况,有则报警。

下面的配置的源码:
<!--定义了一个alramJob的批处理任务-->  <batch:job id="alarmJob"> ???    <batch:step id="alarmPartitionStep"> ???????      <!--此alarmJob只有一个Step--> ???????<batch:partition step="alarmStep" partitioner="alarmPartitioner"> ???????????<!--设置执行的线程池--> ???????????<batch:handler task-executor="alarmPoolTaskExecutorForPartition" /> ???????</batch:partition> ???</batch:step> ???<batch:listeners> ??????<batch:listener ref="jobFailListener"/> ???</batch:listeners></batch:job><batch:step id="alarmStep"> ???<!--代表step的一种处理策略--> ???<batch:tasklet> ???????<!--批处理流程--> ???????<!-- 顺序执行 ???????reader:读取数据 => 提供Checker,即异常校验器,见下面的bean ???????processor:处理数据 => 用Checker进行校验,见下面的bean ???????writer:回写数据 => 判断Checker是否有异常情况,有则报警,见下面的bean ???????--> ???????<batch:chunk reader="reader" processor="processor" writer="writer" commit-interval="1"/> ???</batch:tasklet></batch:step><bean id="alarmPartitioner" class="com.navercorp.pinpoint.web.alarm.AlarmPartitioner"/><bean id="reader" class="com.navercorp.pinpoint.web.alarm.AlarmReader" scope="step"/><bean id="processor" class="com.navercorp.pinpoint.web.alarm.AlarmProcessor" scope="step"/><bean id="writer" class="com.navercorp.pinpoint.web.alarm.AlarmWriter" scope="step"/>

  3.通过对AlarmReader、AlarmProcessor、AlarmWriter的解析,梳理报警原理

(1) com.navercorp.pinpoint.web.alarm.AlarmReader:实现了ItemReader接口 
// StepExecutionListener监听器,可以定义Step开始前后的操作public class AlarmReader implements ItemReader<AlarmChecker>, StepExecutionListener { ???...   // 报警所用的Checker在内存里 ???private final Queue<AlarmChecker> checkers = new ConcurrentLinkedDeque<>(); ???... ???// Checker出队,供processor使用,   @Override ???public AlarmChecker read() { ???????return checkers.poll(); ???} ???// 批处理之前,将应用的报警规则加入Checker之中 ???@Override ???public void beforeStep(StepExecution stepExecution) {     // 查询所有的应用 ???????List<Application> applicationList = applicationIndexDao.selectAllApplicationNames();     // 根据应用用户配置的规则,添加Checker到队列当中 ???????for (Application application : applicationList) { ???????????addChecker(application); ???????} ???} ???private void addChecker(Application application) {     // 根据应用名称获取所有的规则,应用名称就是配置agent的时候,指定的applicationName ???????List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName()); ???????long timeSlotEndTime = System.currentTimeMillis(); ???????Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>(); ???????// 遍历规则 ???????for (Rule rule : rules) {       ?// CheckerCategory是一个枚举类,预置了所有的报警规则模版,比如失败请求次数、慢请求次数等 ???????????CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());       ?// 数据收集器是为检验规则准备的,例如Rule是失败请求次数,但是次数从哪里来,就是从这个收集器来的 ???????????DataCollector collector = collectorMap.get(checkerCategory.getDataCollectorCategory());       // 这里是一个基于Map的缓存 ???????????if (collector == null) { ???????????????collector = dataCollectorFactory.createDataCollector(checkerCategory, application, timeSlotEndTime); ???????????????collectorMap.put(collector.getDataCollectorCategory(), collector); ???????????} ???????????// 创建Checker,有兴趣的读者可以看看CheckerCategroy的源码,设计的还是很不错的。       ?// AlaramChecker是一个抽象方法,具体的功能由子类实现 ???????????AlarmChecker checker = checkerCategory.createChecker(collector, rule);       // 加入队列 ???????????checkers.add(checker); ???????} ???????????} ???...}
 (2) com.navercorp.pinpoint.web.alarm.AlarmProcessor:实现了ItemProcessor接口
public class AlarmProcessor implements ItemProcessor<AlarmChecker, AlarmChecker> {   // 此处的AlarmChecker是上面的read()方法传递过来的
???@Override ???public AlarmChecker process(AlarmChecker checker) { ???????// check,顾名思义,检验,标记是否有异常情况,check()方法见下 ???????checker.check(); ???????return checker; ???}}
com.navercorp.pinpoint.web.alarm.checker.AlarmProcessor
protected abstract boolean decideResult(T value); ???public void check() { ???????// 收集数据 ???????dataCollector.collect(); ???????// 标记是否有异常情况,意为是否满足报警的阀值,decideResult是一个抽象方法     // detected字段在后续的Writter中会被检查 ???????detected = decideResult(getDetectedValue()); }
(3)com.navercorp.pinpoint.web.alarm.AlarmWriter:实现了ItemWriter接口
public class AlarmWriter implements ItemWriter<AlarmChecker> { ???// 需要用户自定义配置在Spring中的AlarmMessageSender,如果不配置,则是一个空实现 ???@Autowired(required = false) ???private AlarmMessageSender alarmMessageSender = new EmptyMessageSender(); ???@Autowired ???private AlarmService alarmService;
   // 实现的接口的方法,主要内容 ???@Override ???public void write(List<? extends AlarmChecker> checkers) throws Exception { ???????Map<String, CheckerResult> beforeCheckerResults = alarmService.selectBeforeCheckerResults(checkers.get(0).getRule().getApplicationId()); ???????// 遍历上面传递的Checker ???????for (AlarmChecker checker : checkers) { ???????????CheckerResult beforeCheckerResult = beforeCheckerResults.get(checker.getRule().getCheckerName()); ???????????if (beforeCheckerResult == null) { ???????????????beforeCheckerResult = new CheckerResult(checker.getRule().getApplicationId(), checker.getRule().getCheckerName(), false, 0, 1); ???????????} ???????????// 对上面的Processor标记的detected值进行检查 ???????????if (checker.isDetected()) { ???????????????sendAlarmMessage(beforeCheckerResult, checker); ???????????} ???????????// 记录报警历史 ???????????alarmService.updateBeforeCheckerResult(beforeCheckerResult, checker); ???????} ???} ???private void sendAlarmMessage(CheckerResult beforeCheckerResult, AlarmChecker checker) { ???????if (isTurnToSendAlarm(beforeCheckerResult)) { ???????????// 是否配置了发送报警短信 ???????????if (checker.isSMSSend()) { ???????????????alarmMessageSender.sendSms(checker, beforeCheckerResult.getSequenceCount() + 1); ???????????} ???????????// 是否配置了发送报警邮件 ???????????if (checker.isEmailSend()) { ???????????????alarmMessageSender.sendEmail(checker, beforeCheckerResult.getSequenceCount() + 1); ???????????} ???????} ???}  // ...}

  

pinpoint web报警机制源码解析

原文地址:https://www.cnblogs.com/langshiquan/p/9497464.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved