分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > IT知识

flume课件(连)

发布时间:2023-09-06 02:09责任编辑:傅花花关键词:暂无标签
http://flume.apache.org/安装1、上传2、解压3、修改conf/flume-env.sh ?文件中的JDK目录 注意:JAVA_OPTS 配置 ?如果我们传输文件过大 报内存溢出时 需要修改这个配置项4、验证安装是否成功 ?./flume-ng version5、配置环境变量export FLUME_HOME=/home/apache-flume-1.6.0-binSource、Channel、Sink有哪些类型 ???Flume SourceSource类型 ??????????????| 说明Avro Source ????????????| 支持Avro协议(实际上是Avro RPC),内置支持Thrift Source ??????????| 支持Thrift协议,内置支持Exec Source ????????????| 基于Unix的command在标准输出上生产数据JMS Source ??????????????| 从JMS系统(消息、主题)中读取数据Spooling Directory Source | 监控指定目录内数据变更Twitter 1% firehose Source|通过API持续下载Twitter数据,试验性质Netcat Source ??????????| 监控某个端口,将流经端口的每一个文本行数据作为Event输入Sequence Generator Source | 序列生成器数据源,生产序列数据Syslog Sources ??????????| 读取syslog数据,产生Event,支持UDP和TCP两种协议HTTP Source ????????????| 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式Legacy Sources ??????????| 兼容老的Flume OG中Source(0.9.x版本) ???Flume ChannelChannel类型 ??说明Memory Channel ???????????| Event数据存储在内存中JDBC Channel ?????????????| Event数据存储在持久化存储中,当前Flume Channel内置支持DerbyFile Channel ?????????????| Event数据存储在磁盘文件中Spillable Memory Channel ??| Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件Pseudo Transaction Channel | 测试用途Custom Channel ???????????| 自定义Channel实现 ???Flume SinkSink类型 说明HDFS Sink ????????| 数据写入HDFSLogger Sink ??????| 数据写入日志文件Avro Sink ????????| 数据被转换成Avro Event,然后发送到配置的RPC端口上Thrift Sink ??????| 数据被转换成Thrift Event,然后发送到配置的RPC端口上IRC Sink ?????????| 数据在IRC上进行回放File Roll Sink ????| 存储数据到本地文件系统Null Sink ????????| 丢弃到所有数据HBase Sink ????????| 数据写入HBase数据库Morphline Solr Sink | 数据发送到Solr搜索服务器(集群)ElasticSearch Sink | 数据发送到Elastic Search搜索服务器(集群)Kite Dataset Sink | 写数据到Kite Dataset,试验性质的Custom Sink ??????| 自定义Sink实现案例1、 A simple examplehttp://flume.apache.org/FlumeUserGuide.html#a-simple-example配置文件############################################################# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1############################################################启动flumeflume-ng agent -n a1 -c conf -f simple.conf -Dflume.root.logger=INFO,console安装telnetyum install telnet退出 ctrl+] ?quitMemory Chanel 配置 ?capacity:默认该通道中最大的可以存储的event数量是100, ?trasactionCapacity:每次最大可以source中拿到或者送到sink中的event数量也是100 ?keep-alive:event添加到通道中或者移出的允许时间 ?byte**:即event的字节量的限制,只包括eventbody案例2、两个flume做集群node01服务器中,配置文件############################################################# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = node1a1.sources.r1.port = 44444# Describe the sink# a1.sinks.k1.type = loggera1.sinks.k1.type = avroa1.sinks.k1.hostname = node2a1.sinks.k1.port = 60000# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1############################################################node02服务器中,安装Flume(步骤略)配置文件############################################################# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.bind = node2a1.sources.r1.port = 60000# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1############################################################先启动node02的Flumeflume-ng agent ?-n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console再启动node01的Flumeflume-ng agent ?-n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console打开telnet 测试 ?node02控制台输出结果案例3、Exec Sourcehttp://flume.apache.org/FlumeUserGuide.html#exec-source配置文件############################################################a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /home/flume.exec.log# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1############################################################启动Flumeflume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console创建空文件演示 touch flume.exec.log循环添加数据for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done案例4、Spooling Directory Sourcehttp://flume.apache.org/FlumeUserGuide.html#spooling-directory-source配置文件############################################################a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /home/logsa1.sources.r1.fileHeader = true# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1############################################################启动Flumeflume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console拷贝文件演示mkdir logscp flume.exec.log logs/案例5、hdfs sinkhttp://flume.apache.org/FlumeUserGuide.html#hdfs-sink配置文件############################################################a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /home/logsa1.sources.r1.fileHeader = true# Describe the sink***只修改上一个spool sink的配置代码块 a1.sinks.k1.type = loggera1.sinks.k1.type=hdfsa1.sinks.k1.hdfs.path=hdfs://bjsxt/flume/%Y-%m-%d/%H%M##每隔60s或者文件大小超过10M的时候产生新文件# hdfs有多少条消息时新建文件,0不基于消息个数a1.sinks.k1.hdfs.rollCount=0# hdfs创建多长时间新建文件,0不基于时间a1.sinks.k1.hdfs.rollInterval=60# hdfs多大时新建文件,0不基于文件大小a1.sinks.k1.hdfs.rollSize=10240# 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件a1.sinks.k1.hdfs.idleTimeout=3a1.sinks.k1.hdfs.fileType=DataStreama1.sinks.k1.hdfs.useLocalTimeStamp=true## 每五分钟生成一个目录:# 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式a1.sinks.k1.hdfs.round=true# 时间上进行“舍弃”的值;a1.sinks.k1.hdfs.roundValue=5# 时间上进行”舍弃”的单位,包含:second,minute,houra1.sinks.k1.hdfs.roundUnit=minute# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1############################################################创建HDFS目录hadoop fs -mkdir /flume启动Flumeflume-ng agent -n a1 -c conf -f hdfs.conf -Dflume.root.logger=INFO,console查看hdfs文件hadoop fs -ls /flume/...hadoop fs -get /flume/...作业:1、flume如何收集java请求数据 ????使用RPC实现 ???????2、项目当中如何来做? 日志存放/log/目录下 以yyyyMMdd为子目录 分别存放每天的数据

  

flume课件(连)

原文地址:https://www.cnblogs.com/huiandong/p/9448182.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved