分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 运营维护

统一日志检索部署(es、logstash、kafka、flume)

发布时间:2023-09-06 02:01责任编辑:郭大石关键词:暂无标签

flume:用来搜集日志,将日志传输至kakfa

kafka:作为缓存,存储来自flume的日志

es:作为存储媒介,存放日志

logstash:真对日志进行过滤处理

flume部署

获取安装包、解压

1 cd /usr/local/src && wget http://10.80.7.177/install_package/apache-flume-1.7.0-bin.tar.gz ?&& tar zxf apache-flume-1.7.0-bin.tar.gz -C /usr/local/

修改flumen-env.sh脚本,设置启动参数

1 cd /usr/local/apache-flume-1.7.0-bin 2 vim conf/flume-env.sh
1 export JAVA_HOME=/usr/java/jdk1.8.0_121/2 export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote" //设置启动jvm使用的内存大小

编辑配置文件

1 vim conf/flume_kfk.conf ???(备注:该配置文件名字随便起)
 1 agent.sources = s1 2 agent.channels = c1 3 agent.sinks = k1 4 ?5 agent.sources.s1.type=exec 6 ?7 #要搜集的日志文件据对路径 8 agent.sources.s1.command=tail -F /root/test.log 9 agent.sources.s1.channels=c110 agent.channels.c1.type=memory11 agent.channels.c1.capacity=1000012 agent.channels.c1.transactionCapacity=10013 #设置Kafka接收器14 agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink15 #设置Kafka的broker地址和端口号16 agent.sinks.k1.brokerList=10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:1909217 #设置Kafka的Topic18 agent.sinks.k1.topic=kafkatest19 #设置序列化方式20 agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder21 agent.sinks.k1.channel=c1

创建kafka的topic

1 cd /data1/kafka/kafka_2.11-0.10.1.0/ && ./kafka-topics.sh --create --topic kafkatest --replication-factor 3 --partitions 20 --zookeeper 10.90.11.19:12181

启动flume

1 /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/flume_launcherclick.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true

测试

1 测试:向/root/test.log文件追加日志。登录kakfa manager上查看kafkatest 的topic中是否有消息。如果有的话,说明没有问题,如果没有,请检查。

部署supervisor监控flume

supervisor部署再次不多加赘述,详见:https://www.cnblogs.com/sailq21/p/9227592.html

编辑/etc/supervisord.conf

[unix_http_server]file=/data/ifengsite/flume/supervisor.sock ??; the path to the socket file[inet_http_server] ????????; inet (TCP) server disabled by defaultport=9001 ???????; ip_address:port specifier, *:port for all iface[supervisord]logfile=/data/logs/supervisord.log ; main log file; default $CWD/supervisord.loglogfile_maxbytes=50MB ???????; max main logfile bytes b4 rotation; default 50MBlogfile_backups=10 ??????????; # of main logfile backups; 0 means none, default 10loglevel=info ???????????????; log level; default info; others: debug,warn,tracepidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pidnodaemon=false ??????????????; start in foreground if true; default falseminfds=1024 ?????????????????; min. avail startup file descriptors; default 1024minprocs=200 ????????????????; min. avail process descriptors;default 200[rpcinterface:supervisor]supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface[supervisorctl]serverurl=unix:///data/ifengsite/flume/supervisor.sock ; use a unix:// URL ?for a unix socket[include]files = /etc/supervisord.d/*.conf

编辑flume启动文件

 1 [program:flume-push] 2 directory = /usr/local/apache-flume-1.7.0-bin/ 3 command = /usr/local/apache-flume-1.7.0-bin/bin/flume-ng agent -n agent -Dflume.monitoring.type=http -Dflume.monitoring.port=9876 -c conf -f /usr/local/apache-flume-1.7.0-bin/conf/push.conf -Dflume.root.logger=ERROR,console -Dorg.apache.flume.log.printconfig=true 4 autostart = true 5 startsecs = 5 6 autorestart = true 7 startretries = 3 8 user = root 9 redirect_stderr = true10 stdout_logfile_maxbytes = 20MB11 stdout_logfile_backups = 2012 stdout_logfile = /data/ifengsite/flume/logs/flume-supervisor.log

创建目录、并启动supervisor

1 mkdir -p /data/ifengsite/flume/logs/2 supervisord -c /etc/supervisord.conf3 重启supervisor:supervisorctl reload

测试:登录ip:9001查看supervisor

如果flume到kafka没有问题,接下来配置logstash

编辑flume_kfk.conf

1 vim /etc/logstash/conf.d/flume_kfk.conf
 1 input{ 2 ????kafka { 3 ????????bootstrap_servers => ["10.90.11.19:19092,10.90.11.32:19092,10.90.11.45:19092,10.90.11.47:19092,10.90.11.48:19092"] 4 ????????client_id => "test" 5 ????????group_id => "test" 6 ????????consumer_threads => 5 7 ????????decorate_events => true 8 ????????topics => "kafkatest" 9 ????????type => "testqh"10 ????}11 }12 filter {13 ????mutate {14 ????????gsub => ["message","\\x","\\\x"]15 ????}16 ????json {17 ????????source => "message"18 ????????remove_field => ["message","beat","tags","source","kafka"]19 ????}20 ????date {21 ????????match => ["timestamp","ISO8601"]22 ????????timezone => "Asia/Shanghai"23 ????????target => "@timestamp"24 ????}25 26 }27 28 #输出到标准输出用于debug,需要输出到es的时候,可配置为es接收。29 output{30 ????stdout{31 ????????codec => rubydebug32 ????}33 }

统一日志检索部署(es、logstash、kafka、flume)

原文地址:https://www.cnblogs.com/sailq21/p/9230336.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved