分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

大数据入门第二十四天——SparkStreaming(2)与flume、kafka整合

发布时间:2023-09-06 01:49责任编辑:蔡小小关键词:暂无标签

前一篇中数据源采用的是从一个socket中拿数据,有点属于“旁门左道”,正经的是从kafka等消息队列中拿数据!

主要支持的source,由官网得知如下:

  获取数据的形式包括推送push和拉取pull

一、spark streaming整合flume

  1.push的方式

    更推荐的是pull的拉取方式

    引入依赖:

 ????<dependency> ???????????<groupId>org.apache.spark</groupId> ???????????<artifactId>spark-streaming-flume_2.10</artifactId> ???????????<version>${spark.version}</version> ???????</dependency>

    编写代码:

package com.streamingimport org.apache.spark.SparkConfimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** ?* Created by ZX on 2015/6/22. ?*/object FlumePushWordCount { ?def main(args: Array[String]) { ???val host = args(0) ???val port = args(1).toInt ???val conf = new SparkConf().setAppName("FlumeWordCount")//.setMaster("local[2]") ???// 使用此构造器将可以省略sc,由构造器构建 ???val ssc = new StreamingContext(conf, Seconds(5)) ???// 推送方式: flume向spark发送数据(注意这里的host和Port是streaming的地址和端口,让别人发送到这个地址) ???val flumeStream = FlumeUtils.createStream(ssc, host, port) ???// flume中的数据通过event.getBody()才能拿到真正的内容 ???val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1)) ???val results = words.reduceByKey(_ + _) ???results.print() ???ssc.start() ???ssc.awaitTermination() ?}}

    flume-push.conf——flume端配置文件:

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /export/data/flumea1.sources.r1.fileHeader = true# Describe the sinka1.sinks.k1.type = avro#这是接收方a1.sinks.k1.hostname = 192.168.31.172a1.sinks.k1.port = 8888# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
flume-push.conf

  2.pull的方式

    属于推荐的方式,通过streaming来主动拉取flume产生的数据

    编写代码:(依赖同上)

package com.streamingimport java.net.InetSocketAddressimport org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}object FlumePollWordCount { ?def main(args: Array[String]) { ???val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]") ???val ssc = new StreamingContext(conf, Seconds(5)) ???//从flume中拉取数据(flume的地址),通过Seq序列,里面可以new多个地址,从多个flume地址拉取 ???val address = Seq(new InetSocketAddress("172.16.0.11", 8888)) ???val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK) ???val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1)) ???val results = words.reduceByKey(_+_) ???results.print() ???ssc.start() ???ssc.awaitTermination() ?}}

      配置flume

  通过拉取的方式需要flume的lib目录中有相关的JAR(要通过spark程序来调flume拉取),通过官网可以得知具体的JAR信息:

  

    配置flume:

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# sourcea1.sources.r1.type = spooldira1.sources.r1.spoolDir = /export/data/flumea1.sources.r1.fileHeader = true# Describe the sink(配置的是flume的地址,等待拉取)a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSinka1.sinks.k1.hostname = mini1a1.sinks.k1.port = 8888# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
flume-poll.conf

    启动flume,然后启动IDEA中的spark streaming:

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 ?-Dflume.root.logger=INFO,console// -D后参数可选

大数据入门第二十四天——SparkStreaming(2)与flume、kafka整合

原文地址:https://www.cnblogs.com/jiangbei/p/8856750.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved