分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > IT知识

基于flume的日志系统

发布时间:2023-09-06 02:29责任编辑:董明明关键词:暂无标签

思路

  1. 日志统一输出至kafka
  2. flume agent充当kafka消费者,将日志输出至elasticsearch
  3. kibana负责展示日志信息

准备工作

  1. flume 1.8 kafka 1.1.0 elasticsearch&kibana 6.5.4
  2. 项目中一般使用log4j等日志框架,需自定义JsonLayout
  3. flume支持的elasticsearch较低,需自定义flume es sink
  4. elasticsearch默认使用utc时间,日志时间需保持一致

JsonLayout

只需要在自定义的JsonLayout中构造一个PatternLayout帮助format日志消息即可 日志格式内容如下

public class JsonLogBean { ???private String system; ???private String ip; ???private String message; ???private String level; ???private String time; ???????//get set 略 ???public JsonLogBean(){} ???public JsonLogBean(String system, String ip,String message, String level, String time) { ???????this.system = system; ???????this.ip = ip; ???????this.message = message; ???????this.level = level; ???????this.time = time; ???}}

log4j1

public class JsonPartternLayout extends PatternLayout{ ???private String system; ???//PatternLayout 默认将异常交给WriterAppender处理 这里改为false ???public boolean ignoresThrowable() { ???????return false; ???} ???private static String ip; ???private static SimpleDateFormat utcFormater; ???static { ???????utcFormater = new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘"); ???????utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//时区定义并进行时间获取 ???????try { ???????????ip = InetAddress.getLocalHost().getHostAddress(); ???????} catch (UnknownHostException e) { ???????????//ignore ???????} ???} ???@Override ???public String format(LoggingEvent event) { ???????StringBuilder sb = new StringBuilder(); ???????sb.append(super.format(event)); ???????String[] s = event.getThrowableStrRep(); ???????if (s != null) { ???????????int len = s.length; ???????????for (int i = 0; i < len; i++) { ???????????????sb.append(s[i]); ???????????????sb.append("\n"); ???????????} ???????} ???????String time = utcFormater.format(new Date(event.getTimeStamp())); ???????return JSON.toJSONString(new JsonLogBean(system, ip, sb.toString(), event.getLevel().toString(), time)) + "\n"; ???} ???public String getSystem() { ???????return system; ???} ???public void setSystem(String system) { ???????this.system = system; ???}}

log4j2

@Plugin(name = "JsonPartternLayout", category = Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)public class JsonPartternLayout extends AbstractStringLayout { ???private PatternLayout patternLayout; ???private String system; ???private static String ip; ???private static SimpleDateFormat utcFormater; ???static { ???????utcFormater = new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘"); ???????utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//时区定义并进行时间获取 ???????try { ???????????ip = InetAddress.getLocalHost().getHostAddress(); ???????} catch (UnknownHostException e) { ???????????//ignore ???????} ???} ???public JsonPartternLayout(final Configuration config, final RegexReplacement replace, final String eventPattern, ????????????final PatternSelector patternSelector, final Charset charset, final boolean alwaysWriteExceptions, ????????????final boolean disableAnsi, final boolean noConsoleNoAnsi, final String headerPattern, ????????????final String footerPattern, final String system) { ???????super(config, charset, ???????????????newSerializerBuilder() ???????????????????????.setConfiguration(config) ???????????????????????.setReplace(replace) ???????????????????????.setPatternSelector(patternSelector) ???????????????????????.setAlwaysWriteExceptions(alwaysWriteExceptions) ???????????????????????.setDisableAnsi(disableAnsi) ???????????????????????.setNoConsoleNoAnsi(noConsoleNoAnsi) ???????????????????????.setPattern(headerPattern) ???????????????????????.build(), ???????????????newSerializerBuilder() ???????????????????????.setConfiguration(config) ???????????????????????.setReplace(replace) ???????????????????????.setPatternSelector(patternSelector) ???????????????????????.setAlwaysWriteExceptions(alwaysWriteExceptions) ???????????????????????.setDisableAnsi(disableAnsi) ???????????????????????.setNoConsoleNoAnsi(noConsoleNoAnsi) ???????????????????????.setPattern(footerPattern) ???????????????????????.build()); ???????this.patternLayout = PatternLayout.newBuilder() ???????????????.withPattern(eventPattern) ???????????????.withPatternSelector(patternSelector) ???????????????.withConfiguration(config) ???????????????.withRegexReplacement(replace) ???????????????.withCharset(charset) ???????????????.withDisableAnsi(disableAnsi) ???????????????.withAlwaysWriteExceptions(alwaysWriteExceptions) ???????????????.withNoConsoleNoAnsi(noConsoleNoAnsi) ???????????????.withHeader(headerPattern) ???????????????.withFooter(footerPattern) ???????????????.build(); ???????this.system = system; ???} ???/** ????* ????* @param event ????* @return ????*/ ???public String toSerializable(LogEvent event) { ???????String msg = this.patternLayout.toSerializable(event); ???????String time = utcFormater.format(new Date(event.getTimeMillis())); ???????return JSON.toJSONString(new JsonLogBean(system, ip,msg, event.getLevel().name(), time)) + "\n"; ???} ???@PluginBuilderFactory ???public static Builder newBuilder() { ???????return new Builder(); ???} ???/** ????* ????*/ ???public static class Builder implements org.apache.logging.log4j.core.util.Builder<JsonPartternLayout> { ???????@PluginBuilderAttribute ???????private String pattern = DEFAULT_CONVERSION_PATTERN; ???????@PluginElement("PatternSelector") ???????private PatternSelector patternSelector; ???????@PluginConfiguration ???????private Configuration configuration; ???????@PluginElement("Replace") ???????private RegexReplacement regexReplacement; ???????// LOG4J2-783 use platform default by default ???????@PluginBuilderAttribute ???????private Charset charset = Charset.defaultCharset(); ???????@PluginBuilderAttribute ???????private boolean alwaysWriteExceptions = true; ???????@PluginBuilderAttribute ???????private boolean disableAnsi = !useAnsiEscapeCodes(); ???????@PluginBuilderAttribute ???????private boolean noConsoleNoAnsi; ???????@PluginBuilderAttribute ???????private String header; ???????@PluginBuilderAttribute ???????private String footer; ???????@PluginBuilderAttribute ???????private String system; ???????private Builder() { ???????} ???????private boolean useAnsiEscapeCodes() { ???????????PropertiesUtil propertiesUtil = PropertiesUtil.getProperties(); ???????????boolean isPlatformSupportsAnsi = !propertiesUtil.isOsWindows(); ???????????boolean isJansiRequested = !propertiesUtil.getBooleanProperty("log4j.skipJansi", true); ???????????return isPlatformSupportsAnsi || isJansiRequested; ???????} ???????public JsonPartternLayout build() { ???????????// fall back to DefaultConfiguration ???????????if (configuration == null) { ???????????????configuration = new DefaultConfiguration(); ???????????} ???????????return new JsonPartternLayout(configuration, regexReplacement, pattern, patternSelector, charset, ???????????????????alwaysWriteExceptions, disableAnsi, noConsoleNoAnsi, header, footer, system); ???????} ???}}

Es Sink

这里我们直接在flume-ng-elasticsearch-sink的源码上做修改,为避免冲突我改了相关的包名 源码见这里 pom配置如下

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" ????????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> ???<modelVersion>4.0.0</modelVersion> ???<groupId>org.apache.flume</groupId> ???<artifactId>flume-ng-high-eslog-sink</artifactId> ???<version>1.8</version> ???<build> ???????<plugins> ???????????<plugin> ???????????????<groupId>org.apache.rat</groupId> ???????????????<artifactId>apache-rat-plugin</artifactId> ???????????</plugin> ???????</plugins> ???</build> ???<dependencies> ???????<dependency> ???????????<groupId>org.apache.flume</groupId> ???????????<artifactId>flume-ng-sdk</artifactId> ???????????<version>1.8.0</version> ???????</dependency> ???????<dependency> ???????????<groupId>org.apache.flume</groupId> ???????????<artifactId>flume-ng-core</artifactId> ???????????<version>1.8.0</version> ???????</dependency> ???????<dependency> ???????????<groupId>org.elasticsearch</groupId> ???????????<artifactId>elasticsearch</artifactId> ???????????<version>6.5.4</version> ???????????<optional>true</optional> ???????</dependency> ???????<dependency> ???????????<groupId>org.elasticsearch.client</groupId> ???????????<artifactId>transport</artifactId> ???????????<version>6.5.4</version> ???????</dependency> ???????<dependency> ???????????<groupId>org.slf4j</groupId> ???????????<artifactId>slf4j-api</artifactId> ???????????<version>1.6.1</version> ???????</dependency> ???????<dependency> ???????????<groupId>org.slf4j</groupId> ???????????<artifactId>slf4j-log4j12</artifactId> ???????????<version>1.6.1</version> ???????????<scope>test</scope> ???????</dependency> ???????<dependency> ???????????<groupId>commons-lang</groupId> ???????????<artifactId>commons-lang</artifactId> ???????????<version>2.5</version> ???????</dependency> ???????<dependency> ???????????<groupId>com.google.guava</groupId> ???????????<artifactId>guava</artifactId> ???????????<version>11.0.2</version> ???????</dependency> ???????<dependency> ???????????<groupId>junit</groupId> ???????????<artifactId>junit</artifactId> ???????????<version>RELEASE</version> ???????</dependency> ???</dependencies></project> ???????

flume-ng-elasticsearch-sink的源码还是比较少的,以下几个比较重要的改动点

将elasticsearch lib下的jar包copy到flume lib下 以下是我copy到flume的相关jar包 如果有jar缺失或冲突查看下flume的日志很容易就能够解决

-rw-r--r--@ 1 zhanghuan ?staff ???10M ?1 ?4 22:26 elasticsearch-6.5.4.jar-rw-r--r--@ 1 zhanghuan ?staff ???16K ?1 ?4 22:26 elasticsearch-cli-6.5.4.jar-rw-r--r--@ 1 zhanghuan ?staff ???36K ?1 ?4 22:26 elasticsearch-core-6.5.4.jar-rw-r--r--@ 1 zhanghuan ?staff ???12K ?1 ?4 22:26 elasticsearch-launchers-6.5.4.jar-rw-r--r--@ 1 zhanghuan ?staff ???11K ?1 ?4 22:26 elasticsearch-secure-sm-6.5.4.jar-rw-r--r--@ 1 zhanghuan ?staff ??110K ?1 ?4 22:26 elasticsearch-x-content-6.5.4.jar-rw-r--r--@ 1 zhanghuan ?staff ??1.6M ?1 ?4 22:37 lucene-analyzers-common-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ???98K ?1 ?4 22:37 lucene-backward-codecs-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??2.9M ?1 ?4 22:37 lucene-core-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ???85K ?1 ?4 22:37 lucene-grouping-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??202K ?1 ?4 22:37 lucene-highlighter-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??143K ?1 ?4 22:37 lucene-join-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ???50K ?1 ?4 22:37 lucene-memory-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ???93K ?1 ?4 22:37 lucene-misc-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??259K ?1 ?4 22:37 lucene-queries-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??373K ?1 ?4 22:37 lucene-queryparser-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??259K ?1 ?4 22:37 lucene-sandbox-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ???14K ?1 ?4 22:37 lucene-spatial-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??231K ?1 ?4 22:37 lucene-spatial-extras-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??295K ?1 ?4 22:37 lucene-spatial3d-7.5.0.jar-rw-r--r--@ 1 zhanghuan ?staff ??240K ?1 ?4 22:37 lucene-suggest-7.5.0.jar-rw-r--r-- ?1 zhanghuan ?staff ??7.4K ?1 ?7 23:04 transport-6.5.4.jar-rw-r--r-- ?1 zhanghuan ?staff ???77K ?1 ?7 23:08 transport-netty4-client-6.5.4.jar-rw-r--r-- ?1 zhanghuan ?staff ??107K ?1 ?7 23:10 reindex-client-6.5.4.jar-rw-r--r-- ?1 zhanghuan ?staff ???72K ?1 ?7 23:11 percolator-client-6.5.4.jar-rw-r--r-- ?1 zhanghuan ?staff ???60K ?1 ?7 23:12 lang-mustache-client-6.5.4.jar-rw-r--r-- ?1 zhanghuan ?staff ???75K ?1 ?7 23:13 parent-join-client-6.5.4.jar-rw-r--r-- ?1 zhanghuan ?staff ??1.1M ?1 ?7 23:16 hppc-0.7.1.jar-rw-r--r-- ?1 zhanghuan ?staff ??258K ?1 ?7 23:18 log4j-api-2.11.1.jar-rw-r--r-- ?1 zhanghuan ?staff ??1.5M ?1 ?7 23:19 log4j-core-2.11.1.jar-rw-r--r-- ?1 zhanghuan ?staff ???50K ?1 ?8 19:29 t-digest-3.2.jar-rw-r--r--@ 1 zhanghuan ?staff ??3.6M ?1 ?8 19:38 netty-all-4.1.25.Final.jar-rw-r--r--@ 1 zhanghuan ?staff ??276K ?1 ?8 19:42 jackson-core-2.8.11.jar-rw-r--r--@ 1 zhanghuan ?staff ???50K ?1 ?8 19:42 jackson-dataformat-cbor-2.8.11.jar-rw-r--r--@ 1 zhanghuan ?staff ???72K ?1 ?8 19:42 jackson-dataformat-smile-2.8.11.jar-rw-r--r--@ 1 zhanghuan ?staff ???40K ?1 ?8 19:42 jackson-dataformat-yaml-2.8.11.jar-rw-r--r-- ?1 zhanghuan ?staff ???28K ?1 10 21:33 flume-ng-high-eslog-sink-1.8.jar-rw-r--r--@ 1 zhanghuan ?staff ???62K ?1 12 19:35 log4j-1.2-api-2.11.1.jar

测试

这里我们使用log4j2 配置如下

<?xml version="1.0" encoding="UTF-8"?><Configuration status="WARN" packages="com.mine.log"> ???<Appenders> ???????<Console name="Console" target="SYSTEM_OUT"> ???????????<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}[%t] %-5p %c %msg%xEx%n" /> ???????</Console> ???????<Kafka name="Kafka" topic="test1"> ???????????<JsonPartternLayout system = "mlog" pattern = "[%t] %c %msg%xEx%n"/> ???????????<Property name="bootstrap.servers">localhost:9092</Property> ???????</Kafka> ???</Appenders> ???<Loggers> ???????<Root level="debug"> ???????????<AppenderRef ref="Console"/> ???????????<AppenderRef ref="Kafka"/> ???????</Root> ???</Loggers></Configuration>

测试代码

public class LogTest { ???private static final Logger logger = LogManager.getLogger(LogTest.class); ???@org.junit.Test ???public void test() { ???????logger.info("输出信息……"); ???????logger.trace("随意打印……"); ???????logger.debug("调试信息……"); ???????logger.warn( "警告信息……"); ???????try { ???????????new Thread(new Runnable() { ???????????????public void run() { ???????????????????logger.warn("test……"); ???????????????} ???????????}).start(); ???????????LogTest.class.getClass().forName("123"); ???????} catch (Exception e) { ???????????logger.error("处理业务逻辑的时候发生一个错误……", e); ???????} ???}}

flume配置

#Name the components on this agentagent.sources = r1agent.sinks = k1agent.channels = c1#Describe/configure the sourceagent.sources.r1.type = org.apache.flume.source.kafka.KafkaSourceagent.sources.r1.channels = channel1agent.sources.r1.batchSize = 5000agent.sources.r1.batchDurationMillis = 2000agent.sources.r1.kafka.bootstrap.servers = localhost:9092agent.sources.r1.kafka.topics = test1agent.sources.r1.kafka.consumer.group.id = custom.g.id#Describe the sinkagent.sinks.k1.type = org.apache.flume.sink.hielasticsearch.ElasticSearchSinkagent.sinks.k1.hostNames = 127.0.0.1:9300agent.sinks.k1.indexName = log_indexagent.sinks.k1.indexType = log_table#agent.sinks.k1.clusterName = log_clusteragent.sinks.k1.batchSize = 500agent.sinks.k1.ttl = 5dagent.sinks.k1.serializer = org.apache.flume.sink.hielasticsearch.ElasticSearchDynamicSerializer#Use a channel which buffers events in memoryagent.channels.c1.type = memoryagent.channels.c1.capacity = 1000agent.channels.c1.transactionCapacity = 100#Bind the source and sink to the channelagent.sources.r1.channels = c1agent.sinks.k1.channel = c1

启动kafka flume elasticsearch kibana即可 配置相关基本下载即用我就略过了哈 日志展示如下

基于flume的日志系统

原文地址:https://www.cnblogs.com/adia/p/10261273.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved