前一篇中数据源采用的是从一个socket中拿数据,有点属于“旁门左道”,正经的是从kafka等消息队列中拿数据!
主要支持的source,由官网得知如下:
获取数据的形式包括推送push和拉取pull
一、spark streaming整合flume
1.push的方式
更推荐的是pull的拉取方式
引入依赖:
????<dependency> ???????????<groupId>org.apache.spark</groupId> ???????????<artifactId>spark-streaming-flume_2.10</artifactId> ???????????<version>${spark.version}</version> ???????</dependency>
编写代码:
package com.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** ?* Created by ZX on 2015/6/22. ?*/object FlumePushWordCount { ?def main(args: Array[String]) { ???val host = args(0) ???val port = args(1).toInt ???val conf = new SparkConf().setAppName("FlumeWordCount")//.setMaster("local[2]") ???// 使用此构造器将可以省略sc,由构造器构建 ???val ssc = new StreamingContext(conf, Seconds(5)) ???// 推送方式: flume向spark发送数据(注意这里的host和Port是streaming的地址和端口,让别人发送到这个地址) ???val flumeStream = FlumeUtils.createStream(ssc, host, port) ???// flume中的数据通过event.getBody()才能拿到真正的内容 ???val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1)) ???val results = words.reduceByKey(_ + _) ???results.print() ???ssc.start() ???ssc.awaitTermination() ?}}
flume-push.conf——flume端配置文件:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /export/data/flumea1.sources.r1.fileHeader = true# Describe the sinka1.sinks.k1.type = avro#这是接收方a1.sinks.k1.hostname = 192.168.31.172a1.sinks.k1.port = 8888# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
2.pull的方式
属于推荐的方式,通过streaming来主动拉取flume产生的数据
编写代码:(依赖同上)
package com.streamingimport java.net.InetSocketAddressimport org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}object FlumePollWordCount { ?def main(args: Array[String]) { ???val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") ???val ssc = new StreamingContext(conf, Seconds(5)) ???//从flume中拉取数据(flume的地址),通过Seq序列,里面可以new多个地址,从多个flume地址拉取 ???val address = Seq(new InetSocketAddress("172.16.0.11", 8888)) ???val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK) ???val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1)) ???val results = words.reduceByKey(_+_) ???results.print() ???ssc.start() ???ssc.awaitTermination() ?}}
配置flume
通过拉取的方式需要flume的lib目录中有相关的JAR(要通过spark程序来调flume拉取),通过官网可以得知具体的JAR信息:
配置flume:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /export/data/flumea1.sources.r1.fileHeader = true# Describe the sink(配置的是flume的地址,等待拉取)a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSinka1.sinks.k1.hostname = mini1a1.sinks.k1.port = 8888# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
启动flume,然后启动IDEA中的spark streaming:
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 ?-Dflume.root.logger=INFO,console// -D后参数可选
大数据入门第二十四天——SparkStreaming(2)与flume、kafka整合
原文地址:https://www.cnblogs.com/jiangbei/p/8856750.html