实现如图的的效果(详细步骤请参考官方文档(http://flume.apache.org/FlumeUserGuide.html),flume更新版本比较快)
flume1.conf的配置文件内容
a1.sources = r1a1.sinks = k1a1.channels = c1#具体定义source ?a1.sources.r1.type = spooldir#先创建此目录,保证里面空的 ?a1.sources.r1.spoolDir = /logs #sink到kafka里面a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink#设置Kafka的Topica1.sinks.k1.kafka.topic = haha1#设置Kafka的broker地址和端口号a1.sinks.k1.kafka.bootstrap.servers = zhiyou01:9092,zhiyou02:9092,zhiyou03:9092#配置批量提交的数量a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.producer.linger.ms = 1a1.sinks.ki.kafka.producer.compression.type= snappy#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高a1.channels.c1.type = filea1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpointa1.channels.c1.dataDirs = /home/uplooking/data/flume/data#通过channel c1将source r1和sink k1关联起来a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
flume2.conf的配置文件内容
参考官方文档截图如下
详细配置如下
a1.sources = r1a1.sinks = k1a1.channels = c1#对于source的配置描述 监听avroa1.sources.r1.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.r1.batchSize=5000a1.sources.r1.batchDurationMillis=2000a1.sources.r1.kafka.bootstrap.servers = han01:9092,han02:9092,han03:9092a1.sources.r1.kafka.topics=test#定义拦截器,为消息添加时间戳a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder#对于sink的配置描述 传递到hdfs上面a1.sinks.k1.type = hdfs#集群的nameservers名字#单节点的直接写:hdfs://han01/xxxa1.sinks.k1.hdfs.path = hdfs://ns/flume/%Y%m%da1.sinks.k1.hdfs.filePrefix = events-a1.sinks.k1.hdfs.fileType = DataStream#不按照条数生成文件a1.sinks.k1.hdfs.rollCount = 0#HDFS上的文件达到128M时生成一个文件a1.sinks.k1.hdfs.rollSize = 134217728#HDFS上的文件达到60秒生成一个文件a1.sinks.k1.hdfs.rollInterval = 60#对于channel的配置描述 使用内存缓冲区域做数据的临时缓存a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#通过channel c1将source r1和sink k1关联起来a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
flume与kafka结合上传文件到HDFS上
原文地址:https://www.cnblogs.com/han-guang-xue/p/9966078.html