分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > IT知识

flume 整合kafka

发布时间:2023-09-06 01:07责任编辑:白小东关键词:暂无标签

 背景:系统的数据量越来越大,日志不能再简单的文件的保存,如此日志将会越来越大,也不方便查找与分析,综合考虑下使用了flume来收集日志,收集日志后向kafka传递消息,下面给出具体的配置

# The configuration file needs to define the sources,# the channels and the sinks.# Sources, channels and sinks are defined per agent, ??# in this case called ‘agent‘agent.sources = r1agent.channels = c1agent.sinks = s1# For each one of the sources, the type is definedagent.sources.r1.type = netcatagent.sources.r1.bind = localhostagent.sources.r1.port = 10245agent.sources.r1.charset = UTF-8# The channel can be defined as follows.agent.sources.r1.channels = c1# Each sink‘s type must be definedagent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSinkagent.sinks.s1.topic = testagent.sinks.s1.brokerList = ip:9092agent.sinks.s1.requiredAcks = 1agent.sinks.s1.batchSize = 20agent.sinks.s1.channel = c1# Each channel‘s type is defined.agent.channels.c1.type = memory# Other config values specific to each type of channel(sink or source)# can be defined as well# In this case, it specifies the capacity of the memory channelagent.channels.c1.capacity = 100

启动方式:

   bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name agent -Dflume.root.logger=INFO,console

再启动之前一定要先启动kafka,这里可能会有一个错误

  

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired

这个是因为默认情况下kafka是广播的localhost,所以如果不是同一个机器需要修改下配置

advertised.listeners=PLAINTEXT://ip:9092把默认的localhost替换成IP地址 重新启动下就可以了.

flume 整合kafka

原文地址:http://www.cnblogs.com/EncryptingLife/p/7456188.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved