分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > IT知识

Flume整合Kafka完成实时数据采集

发布时间:2023-09-06 02:31责任编辑:白小东关键词:暂无标签

agent选择

agent1 exec source + memory channel + avro sink

agent2 avro source + memory channel 

模拟实际工作中的场景,agent1 为A机器,agent2 为B机器。

avro source: 监听avro端口,并且接收来自外部avro信息,

avro sink:一般用于跨节点传输,主要绑定数据移动目的地的ip和port

 

 

在创建agent2配置文件

cd /app/flume/flume/conf

vi test-avro-memory-kafka.conf

avro-memory-kafka.sources = avro-sourceavro-memory-kafka.sinks = kafka-sinkavro-memory-kafka.channels = memory-channel avro-memory-kafka.sources.avro-source.type = avroavro-memory-kafka.sources.avro-source.bind= dblab-VirtualBoxavro-memory-kafka.sources.avro-source.port=44444 avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSinkavro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = dblab-VirtualBox:9092avro-memory-kafka.sinks.kafka-sink.kafka.topic = hello_topicavro-memory-kafka.sinks.kafka-sink.batchSize = 5avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1 avro-memory-kafka.channels.memory-channel.type = memory avro-memory-kafka.sources.avro-source.channels = memory-channelavro-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动agent2

flume-ng agent --name avro-memory-kafka -c conf -f conf/test-avro-memory-kafka.conf -Dflume.root.logger=INFO,console

这里一定要等agent2的avro-source启动成功,已经监听了自己的44444端口,才能去启动agent1,不然agent1启动会被拒绝连接

 

创建agent1配置文件

cd /app/flume/flume/conf

vi test-exec-memory-avro.conf

exec-memory-avro.sources = exec-sourceexec-memory-avro.sinks = avro-sinkexec-memory-avro.channels = memory-channelexec-memory-avro.sources.exec-source.type = execexec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.logexec-memory-avro.sources.exec-source.shell = /bin/sh -cexec-memory-avro.sinks.avro-sink.type = avroexec-memory-avro.sinks.avro-sink.hostname = dblab-VirtualBoxexec-memory-avro.sinks.avro-sink.port = 44444exec-memory-avro.channels.memory-channel.type = memoryexec-memory-avro.sources.exec-source.channels = memory-channelexec-memory-avro.sinks.avro-sink.channel = memory-channel

 

启动agent2

flume-ng agent --name exec-memory-avro -c conf -f conf/test-exec-memory-avro.conf -Dflume.root.logger=INFO,console

接下来对Kafka进行配置

先启动Kafka

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

创建hello_topic

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 启动生产者

kafka-console-producer.sh --broker-list localhost:9092 --topic hello_topic

启动一个Kafka的客户端来消费,测试是否启动成功

kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic

向agent1的exec-source监听的文件中写数据

查看Kafka的客户端是否通过flume消费到数据

至此完成Flume整合Kafka完成实时数据采集

Flume整合Kafka完成实时数据采集

原文地址:https://www.cnblogs.com/aishanyishi/p/10326042.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved