Spark Streaming流式处理(四)

Spark Streaming流式处理

一、SparkStreaming概述

1.1、SparkStreaming是什么?

image.png

Spark Streaming类似于Apache Storm,用于流式数据的处理。根据官方文件的介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka,Flume,Twitter,ZeroMQ和简单的TCP套接字等等。等待输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也可以存储在很多地方,如HDFS,数据库等。另外,Spark Streaming也能够和MLlib(机器学习)以及GraphX完美融合。

image.png


1.2、为什么要学习Spark Streaming?

(1)、易用

            image.png


(2)、容错

            image.png

(3)、易整合到Spark体系

            image.png

1.3、Spark和Storm的对比

Spark
Storm
image.pngimage.png
开发语言:Scala
开发语言:Clojure
编程模型:DStream编程模型:Spout/Bolt
image.pngimage.png

二、Spark Streaming原理

2.1、Spark Streaming原理

Spark Streaming是基于Spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。

2.2、Spark Streaming作业执行流程

image.png

  • Spark Streaming作业的执行过程

(1)、客户端提交作业后启动Driver,Driver是一个spark作业的主进程,负责这个作业的解析,并调度作业任务到executor中。

(2)、每个作业包包含多个Executor,每个Executor以线程的方式运行task,SparkStreaming至少包含一个receiver task。

(3)、Receiver接收数据后生成Block,并把BlockId汇报给Driver,然后备份到另外一个Executor上。

(4)、ReceiverTracker维护Receiver汇报BlockId。

(5)、Driver定时启动JobGenerator,根据DStream的关系生成逻辑RDD,然后创建Jobset,交给JobScheduler。

(6)、JobScheduler负责调度Jobset,交给DAGScheduler,DAGScheduler根据逻辑RDD,生成相应的Stages,每个stages包含一个d到多个task。

(7)、TaskScheduler负责把task调度到Executor上,并维护task的运行状态。

(8)、当task ,stages,Jobset完成后,单个batch才算完成。

  • 计算流程

                Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加,或者存储到外部设备。下图显示了Spark Streaming的整个流程。

image.png


  • 对于Spark Streaming来说,其RDD的传承关系如下图所示,图中的每一个椭圆形表示一个RDD,椭圆形中的每个圆形代表一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每一个Batch Size所产生的中间结果RDD。我们可以看到图中的每一个RDD都是通过lineage相连接的,由于Spark Streaming输入数据可以来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每一个数据流拷贝两份到其他的机器)都能保证容错性。所以RDD中任意的Partition出错,都可以并行地在其他机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。 

image.png


  • 实时性

        对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解,以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2秒钟之间(Storm目前最小的延迟是100ms左右),所以Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。

  • 扩展性与吞吐量

        Spark目前在EC2上已能够线性扩展到100个节点(每个节点4Core),可以以数秒的延迟处理6GB/s的数据量(60M records/s),其吞吐量也比流行的Storm高2~5倍,下图是Berkeley利用WordCount和Grep两个用例所做的测试,在Grep这个测试中,Spark Streaming中的每个节点的吞吐量是670k records/s,而Storm是115k records/s。

image.png

三、DStream

3.1、什么是DStream

Discretized Stream是Spark  Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:

image.png

对数据的操作也是按照RDD为单位来进行的

image.png

Spark Streaming使用数据源产生的数据流创建DStream,也可以在已有的DStream上使用一些操作来创建新的DStream。

它的工作流程像下面的图所示一样,接收到实时数据后,给数据分批次,然后传给Spark Engine处理,最后生成该批次的结果。

image.png

3.2、DStream相关操作

DStream上的原语与RDD的类似,分为TransTransformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey(),transform()以及各种Window相关的原语。

3.2.1、Transformations on DStream

Transformation

Meaning

map(func)

DStream中的各个元素进行func函数操作,然后返回一个新的DStream

flatMap(func)

map方法类似,只不过各个输入项可以被输出为零个或多个输出项

filter(func)

过滤出所有函数func返回值为trueDStream元素并返回一个新的DStream

repartition(numPartitions)

增加或减少DStream中的分区数,从而改变DStream的并行度

union(otherStream)

将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.

count()

通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream

reduce(func)

对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.

countByValue()

对于元素类型为KDStream,返回一个元素为(K,Long)键值对形式的新的DStreamLong对应的值为源DStream中各个RDDkey出现的次数

reduceByKey(func, [numTasks])

利用func函数对源DStream中的key进行聚合操作,然后返回新的(KV)对构成的DStream

join(otherStream, [numTasks])

输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(VW)类型的DStream

cogroup(otherStream, [numTasks])

输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream

transform(func)   

通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD

updateStateByKey(func)

根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream

特殊的Transformations

(1)、updateStateByKey Operation

updateStateByKey原语用于记录历史记录,前面的WordCount案例就用到了该特性。若不用updateStateByKey来更新状态,那么每次数据进来后分析完成后,结果输出后将不在保存。

(2)、Window Operations(开窗函数)

滑动窗口转换操作

        滑动窗口转换操作的计算过程如下图所示:我们可以事先设定一个滑动窗口的长度(也就是窗口持续的时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后,就可以让窗口按照指定时间间隔在源DStream上滑动,每次窗口停放的位置上,都会有一部分DStream被框如窗口内,形成一小段的DStream,这是,就可以启动对这个小段DStream的计算。

image.png

(1)、红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流;

(2)、这里面每一个time都是时间单元,在官方的例子中,每个window size是3time unit,而且每个2个时间单元,窗口会slide一次。

所以,基于窗口的操作,需要指定2个参数:

  • window length  - The duration of the window(3 in the figure)

  • slide interval     - The interval at which the window-based operation is performed(2 in the figure)

    a.窗口大小,一段时间内数据的容器。

    b.滑动间隔,每隔多久计算一次。

3.2.2、Output Operations on DStreams

Output Operations可以将DStream的数据输出到外部的数据库或文件系统,当某个Output Operations原语被调用时(与RDD的Action相同),Streaming程序才会开始真正的计算过程。

Output   Operation

Meaning

print()

打印到控制台

saveAsTextFiles(prefix, [suffix])

保存流的内容为文本文件,文件名为

"prefix-TIME_IN_MS[.suffix]".

saveAsObjectFiles(prefix, [suffix])

保存流的内容为SequenceFile,文件名为

 "prefix-TIME_IN_MS[.suffix]".

saveAsHadoopFiles(prefix, [suffix])

保存流的内容为hadoop文件,文件名为

 "prefix-TIME_IN_MS[.suffix]".

foreachRDD(func)

Dstream里面的每个RDD执行func

3.3、DStream操作实战

3.3.1、SparkStreaming接收socket数据,实现单词计数累加

  • 架构图

image.png

  • 实现流程

        安装并启动生产者

        首先在node7上面安装nc工具,nc命令是netcat命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。

yum install -y nc

        启动一个服务器并监听9999端口

nc -lk 9999

        向指定端口发送数据

  • 编写Spark Streaming程序

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
  •  SparkStreamSocket类:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 利用SparkStreaming接收socket数据,实现单词统计WordCount
  *
  * @author:admin
  *
  */
object SparkStreamSocket {
  def main(args: Array[String]): Unit = {
    //todo:1、构建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamSocket").setMaster("local[2]")
    //todo:2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    //todo:3、构建StreamingContext,需要两个参数,第一个是SparkContext,第二个是batch size
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:4、使用StreamingContext.socketTextStream
    val inputDStream = ssc.socketTextStream("node7",9999)
    //todo:5、切分每一行
    val wordsDStream = inputDStream.flatMap(_.split(" "))
    //todo:6、每个单词记为1
    val wordAndOne = wordsDStream.map((_,1))
    //todo:7、相同出现的次数累加
    val result = wordAndOne.reduceByKey(_+_)
    //todo:8、打印输出
    result.print()
    //todo:9、开始计算
    ssc.start()
    //todo:10、不断接收数据,程序一直阻塞,等待数据进入
    ssc.awaitTermination()
  }
}
  • 运行结果:

image.png

image.png

SparkStreaming每个5s计算一次当前5s内的数据,然后将每个批次的结果数据累加输出。

3.3.2、使用updateStateByKey进行单词统计

  • SparkStreamingSocketTotal类:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 利用updateStateByKey进行单词统计
  * @author:admin
  *
  */
object SparkStreamingSocketTotal {

  //todo:该方法需要两个参数
  //todo:nowBatch表示当前批次中单词出现的次数,(hadoop,1)(hadoop,1)(spark,1)
  //todo:historyCount之前批次中单词出现的次数
  def updateFunc(nowBatch:Seq[Int], historyCount:Option[Int]) :Option[Int] = {
      //todo:把当前批次的结果和之前批次出现的次数进行累加
    val newCount = nowBatch.sum+historyCount.getOrElse(0)
    //返回
    Some(newCount)
  }

  def main(args: Array[String]): Unit = {
    //todo:1、构建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamingSocketTotal").setMaster("local[2]")
    //todo:2、创建sparkContext对象
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    //todo:3、如果要将中间结果进行保存,那么需要设置checkpoint
    sc.setCheckpointDir("./ck2017")
    //todo:4、构建StreamingContext,需要两个参数,第一个是SparkConf对象,第二个是batch size
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:5、调用StreamingContext.socketTextStream
    val dstream = ssc.socketTextStream("node7",9999)
    //todo:6、切分每一行,并且每个单词记为1
    val wordAndOne = dstream.flatMap(_.split(" ")).map((_,1))
    //todo:7、把所有批次的结果进行累加
    val result = wordAndOne.updateStateByKey(updateFunc)
    //todo:8、打印输出
    result.print()
    //todo:9、开始计算
    ssc.start()
    //todo:10、不断接收数据
    ssc.awaitTermination()
  }
}
  • 运行结果:

image.png

image.png

3.3.3、使用SparkStreaming的开窗函数reduceByKeyAndWindow实现单词统计

  • SparkStreamingSocketWindow类:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 利用SparkStreaming的开窗函数进行单词统计
  * @author:admin
  *
  */
object SparkStreamingSocketWindow {
  def main(args: Array[String]): Unit = {
    //todo:1、构建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamingSocketWindow").setMaster("local[2]")
    //todo:2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    //todo:3、构建StreamingContext,需要两个参数,第一个是SparkContext,第二个是batch size
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:4、使用StreamingContext.socketTextStream
    val inputDStream = ssc.socketTextStream("node7",9999)
    //todo:5、切分每一行
    val wordsDStream = inputDStream.flatMap(_.split(" "))
    //todo:6、每个单词记为1
    val wordAndOne = wordsDStream.map((_,1))
    //todo:7、相同出现的次数累加
    //todo:reduceByKeyAndWindow函数三个参数:
    // todo:(1)传入一个作用函数
    // todo:(2)表示窗口的大小,即每次想要统计多长时间内的数据
    // todo:(3)滑动窗口的时间间隔,也就是每隔多长时间计算一次
    val window = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(5))
    //todo:8、打印输出
    window.print()
    //todo:9、开始计算
    ssc.start()
    //todo:10、不断接收数据,程序一直阻塞,等待数据进入
    ssc.awaitTermination()
  }
}
  • 运行结果:

image.png

image.png

注意:当窗口大小 > 窗口滑动时间间隔        数据会被重复计算

          当窗口大小 < 窗口滑动时间间隔        数据会丢失

          当窗口大小 = 窗口滑动时间间隔        数据不会被重复计算也不会丢失

结论:(1)、窗口大小和滑动窗口时间间隔是当前批次整数倍

          (2)、窗口大小和滑动时间间隔相等

3.3.4、SparkStreaming开窗函数统计一定时间内的热门词汇

代码如下:

  • SparkStreamingHotWords类:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 利用sparkStreaming实现热门词汇的统计
  *
  * @author:admin
  *
  */
object SparkStreamingHotWords {
  def main(args: Array[String]): Unit = {
    //todo:1、构建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamingHotWords").setMaster("local[2]")
    //todo:2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    //todo:3、构建StreamingContext,需要两个参数,第一个是SparkContext,第二个是batch size
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:4、使用StreamingContext.socketTextStream
    val inputDStream = ssc.socketTextStream("node7",9999)
    //todo:5、切分每一行
    val wordsDStream = inputDStream.flatMap(_.split(" "))
    //todo:6、每个单词记为1
    val wordAndOne = wordsDStream.map((_,1))
    //todo:7、相同出现的次数累加
    //todo:reduceByKeyAndWindow函数三个参数:
    // todo:(1)传入一个作用函数
    // todo:(2)表示窗口的大小,即每次想要统计多长时间内的数据
    // todo:(3)滑动窗口的时间间隔,也就是每隔多长时间计算一次
    val window = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(5),Seconds(5))
    //todo:8、打印输出
    val finalDstream = window.transform(rdd => {
      //todo:按照单词出现的次数降序排序
      val sorted = rdd.sortBy(x => x._2, false)
      //todo:取出前3位
      val result = sorted.take(3)
      println("=============打印前3位开始==============")
      //todo:打印输出
      result.foreach(println)
      println("=============打印前3位结束==============")
      //todo:返回值
      sorted
    })
    println("================所有单词降序排序start=================")
    //todo:打印
    finalDstream.print()
    println("================所有单词降序排序end=================")
    //todo:9、开始计算
    ssc.start()
    //todo:10、不断接收数据,程序一直阻塞,等待数据进入
    ssc.awaitTermination()
  }
}
  • 运行结果:

image.png

image.png

四、Spark Streaming整合flume实战

flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,SparkStreaming做实时处理。

Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume中Poll拉取数据

4.1、Poll拉取数据方式

(1)、安装Flume1.6以上,此处安装flume1.8

  • 下载flume1.8的安装包apache-flume-1.8.0-bin.tar.gz

    image.png

  • 解压缩安装包到/usr/local/src目录下:

tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local/src/
  • 加压缩后,进入/usr/local/src目录,重命名apache-flume-1.8.0-bin目录位flume:

mv apache-flume-1.8.0-bin/ flume
  • 进入conf目录,重命名flume-env.sh.template:

mv flume-env.sh.template flume-env.sh
  • 编辑flume-env.sh文件,在里面配置JAVA_HOME:

export JAVA_HOME=/usr/local/src/jdk

(2)、下载依赖包spark-streaming-flume-sink_2.11-2.0.2.jar(千万不要下载成了spark-streaming-flume_2.11-2.0.2.jar,否则会报错,错误信息下面会给出),并将依赖包放到flume的lib目录下

(3)、在conf目录下新建文件flume-poll-spark.conf,内容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=node7
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000

(4)、编写代码:

  • 添加依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
  • SparkStreamingPollFlume类:

import java.net.InetSocketAddress

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.FlumeUtils

/**
  * 利用SparkStreaming接收flume中的数据---->poll拉模式
  * 注意:拉模式下启动是有顺序的,需要先启动flume,然后在启动程序
  * @author:admin
  *
  */
object SparkStreamingPollFlume {

  def updateFunc(a:Seq[Int], b:Option[Int]):Option[Int] = {
    Some(a.sum+b.getOrElse(0))
  }

  def main(args: Array[String]): Unit = {
    //todo:1、创建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamingPollFlume").setMaster("local[2]")
    //todo:2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    //todo:有状态需要设置checkpoint
    sc.setCheckpointDir("./flume-poll")
    //todo:3、创建StreamingContext
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:4、需要提供flume所在服务器的ip和端口,这里可以接收多个flume agent
    val addresses = Seq(new InetSocketAddress("node7",8888))
    //todo:5、利用FlumeUtils工具类的createPollingStream方法来创建DStream
    val dstream = FlumeUtils.createPollingStream(ssc,addresses,StorageLevel.MEMORY_AND_DISK)
    //todo:6、获取DStream中的数据,存放的真实数据
    val flumeDStream = dstream.map(x=>new String(x.event.getBody.array()))
    //todo:7、切分每一行
    val wordAndOne = flumeDStream.flatMap(_.split(" ")).map((_,1))
    //todo:8、累加相同单词出现的次数
    val result = wordAndOne.updateStateByKey(updateFunc)
    //todo:9、打印
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

(5)、启动flume,此处需要注意,拉模式下需要先启动flume,再启动应用程序,否则会报错,在flume目录下启动:

bin/flume-ng agent -n a1 -c conf -f conf/flume-poll-spark.conf -Dflume.root.logger=INFO,console

出现报错信息如下:

org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink
	at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:70)
	at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:408)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.flume.sink.SparkSink
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:264)
	at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:68)
	... 11 more

原因:

        flume1.8下面的lib包里面自带的scala-library的jar包是2.10版本的,和spark-streaming-flume-sink_2.11-2.0.2.jar版本不一致,所以导致启动flume agent失败。

解决方法:

        进入spark安装目录下的jars目录下,将scala-library-2.11.8.jar拷贝到flume的lib目录下,同时将scala-library-2.10.5.jar删除。

注意:上面在下载依赖包的时候,要下载spark-streaming-flume-sink包,而不是spark-streaming-flume包,否则也会报上面的错误。

(6)、在/root/data目录下有一个words.txt的文件,内容如下:

spark hadoop hadoop 
spark hive storm
storm hive spark
hadoop storm

(7)、启动程序,结果如下:

image.png

(8)、将一个people.txt文件移动到/root/data目录,文件内容如下:

1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40

(9)、结果如下:

image.png

4.2、Push推数据模式

  • 配置文件flume-push-spark.conf,内容如下:

#push mode
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
#程序运行的主机
a1.sinks.k1.hostname=192.168.25.1
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000

注意:文件中的hostname和port指的是程序运行的服务器的ip和端口

  • 编写代码,SparkStreamingPushFlume类:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume.FlumeUtils

/**
  * 利用SparkStreaming接收flume推送过来的数据
  * 注意:此处的启动顺序和拉模式相反,先启动应用程序,再启动flume
  * @author:admin
  *
  */
object SparkStreamingPushFlume {
  def updateFunc(a:Seq[Int], b:Option[Int]):Option[Int] = {
    Some(a.sum+b.getOrElse(0))
  }

  def main(args: Array[String]): Unit = {
    //todo:1、创建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamingPushFlume").setMaster("local[2]")
    //todo:2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    //todo:有状态需要设置checkpoint
    sc.setCheckpointDir("./flume-push")
    //todo:3、创建StreamingContext
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:5、利用FlumeUtils工具类的createPollingStream方法来创建DStream
    val dstream = FlumeUtils.createStream(ssc,"192.168.25.1",8888)
    //todo:6、获取DStream中的数据,存放的真实数据
    val flumeDStream = dstream.map(x=>new String(x.event.getBody.array()))
    //todo:7、切分每一行
    val wordAndOne = flumeDStream.flatMap(_.split(" ")).map((_,1))
    //todo:8、累加相同单词出现的次数
    val result = wordAndOne.updateStateByKey(updateFunc)
    //todo:9、打印
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 启动程序(注意:push推模式是先启动程序,再启动flume,与poll拉模式启动顺序相反)

image.png

  • 启动flume:

bin/flume-ng agent -n a1 -c conf -f conf/flume-push-spark.conf -Dflume.root.logger=INFO,console
  • 移动words.txt文件到/root/data目录下,文件内容如下:

spark hadoop hadoop 
spark hive storm
storm hive spark
hadoop storm
  • 程序结果如下:

image.png

  • 在实际工作中,优先选择poll拉模式,因为一旦程序运行的服务器出现问题(例如:宕机等),则push推模式推送数据就可能推送不过来,数据就会丢失;而拉模式时,即便是程序运行的服务器出现问题,当程序重新启动时,又会继续去flume上拉取数据,不会出现丢数据的问题

五、Spark Streaming整合Kafka

Kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用SparkStreaming实时计算框架实时地读取Kafka中的数据然后进行计算。

在spark1.3版本后,KafkaUtils里面提供了两个创建DStream的方法,一种为KafkaUtils.createDstream,另一种为KafkaUtils.createDirectStream。

5.1、KafkaUtils.createDstream方式

构造函数为KafkaUtils.createDstream(ssc,[zk],[consumer group id],[per-topic,partitions])使用了receives来接收数据,利用的是Kafka高层次的消费者API,对于所有的receives接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启动WAL(预写)日志,该日志存储在HDFS上。

A、创建一个receiver来对kafka进行定时拉取数据,ssc的RDD分区和kafka的topic分区不是一个概念,故如果增加特定主题消费的线程数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量。

B、对于不同的group和topic可以使用多个receiver创建不同的DStream。

C、如果启用了WAL,需要设置存储级别,即KafkaUtils.createDstream(...,StorageLevel.MEMORY_AND_DISK_SER)

5.1.1、KafkaUtils.createDstream实战

(1)、添加kafka的pom依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.0.2</version>
</dependency>

(2)、启动Zookeeper集群和Kafka集群(此处Zookeeper集群和kafka集群的安装省略)

(3)、创建kafka的topic:

kafka-topics.sh --create --zookeeper node7:2181 --replication-factor 1 --partitions 3 --topic kafka_spark

image.png

(4)、发送消息到topic:

kafka-console-producer.sh --broker-list node7:9092 --topic  kafka_spark

image.png

(5)、编写代码,SparkStreamingKafka_Receiver类:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * 利用SparkStreaming接收Kafka中的数据
  *
  * @author:admin
  *
  */
object SparkStreamingKafka_Receiver {

  def updateFunc(a:Seq[Int],b:Option[Int]) :Option[Int] = {
    Some(a.sum+b.getOrElse(0))
  }

  def main(args: Array[String]): Unit = {
    //todo:1、构建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamingKafka_Receiver").setMaster("local[2]")
    //todo:2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    sc.setCheckpointDir("./kafka-receiver")
    //todo:3、构建StreamingContext
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:4、指定zookeeper
    val zkServer="node7:2181,node8:2181,node9:2181"
    //todo:5、指定groupId
    val groupId="kafka_spark_consumer"
    //todo:6、指定topics,map格式,内容(topic名称,分区数)  这里可以利用一个消费者来进行消费
    val topics=Map("kafka_spark"->3)
    //todo:7、通过使用KafkaUtils.createStream来接收kafka topic中的数据,生成DStream
    val dstream = KafkaUtils.createStream(ssc,zkServer,groupId,topics,StorageLevel.MEMORY_AND_DISK)
    //todo:8、获取kafka中topic的内容
    val kafkaDStream = dstream.map(x=>x._2)
    //todo:9、切分每一行,每个单词记为1
    val wordAndOne= kafkaDStream.flatMap(_.split(" ")).map((_,1))
    //todo:10、统计相同单词出现的次数
    val result = wordAndOne.updateStateByKey(updateFunc)
    //todo:11、打印
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 运行结果:

image.png

image.png

  • 可以稍微对其进行改造,改成同时开启多个线程进行消费,最后合并结果,如下:

/**
  * 利用SparkStreaming接收Kafka中的数据
  *
  * @author:admin
  *
  */
object SparkStreamingKafka_Receiver {

  def updateFunc(a:Seq[Int],b:Option[Int]) :Option[Int] = {
    Some(a.sum+b.getOrElse(0))
  }

  def main(args: Array[String]): Unit = {
    //todo:1、构建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamingKafka_Receiver").setMaster("local[7]")
    //todo:2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    sc.setCheckpointDir("./kafka-receiver")
    //todo:3、构建StreamingContext
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:4、指定zookeeper
    val zkServer="node7:2181,node8:2181,node9:2181"
    //todo:5、指定groupId
    val groupId="kafka_spark_consumer"
    //todo:6、指定topics,map格式,内容(topic名称,分区数)  这里可以利用一个消费者来进行消费
    val topics=Map("kafka_spark"->3)
    //todo:7、通过使用KafkaUtils.createStream来接收kafka topic中的数据,生成DStream
    var num=5
    //todo:8、相当于开启了5个线程去读取kafka topic中的数据,生成DStream
    val resultDStream = (1 to 5).map(x => {
        val kafkaData = KafkaUtils.createStream(ssc, zkServer, groupId, topics, StorageLevel.MEMORY_AND_DISK).map(x => x._2)
        kafkaData
      }
    )
    //todo:利用StreamContext将所有的线程产生的DStream组合在一起
    val kafkaDStream = ssc.union(resultDStream)

//    val dstream = KafkaUtils.createStream(ssc,zkServer,groupId,topics,StorageLevel.MEMORY_AND_DISK)
    //todo:8、获取kafka中topic的内容
//    val kafkaDStream = dstream.map(x=>x._2)
    //todo:9、切分每一行,每个单词记为1
    val wordAndOne= kafkaDStream.flatMap(_.split(" ")).map((_,1))
    //todo:10、统计相同单词出现的次数
    val result = wordAndOne.updateStateByKey(updateFunc)
    //todo:11、打印
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 启动:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/12/03 17:21:10 INFO SparkContext: Running Spark version 2.0.2
17/12/03 17:21:10 INFO SecurityManager: Changing view acls to: acer
17/12/03 17:21:10 INFO SecurityManager: Changing modify acls to: acer
17/12/03 17:21:10 INFO SecurityManager: Changing view acls groups to: 
17/12/03 17:21:10 INFO SecurityManager: Changing modify acls groups to: 
17/12/03 17:21:10 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(acer); groups with view permissions: Set(); users  with modify permissions: Set(acer); groups with modify permissions: Set()
17/12/03 17:21:11 INFO Utils: Successfully started service 'sparkDriver' on port 4462.
17/12/03 17:21:11 INFO SparkEnv: Registering MapOutputTracker
17/12/03 17:21:11 INFO SparkEnv: Registering BlockManagerMaster
17/12/03 17:21:11 INFO DiskBlockManager: Created local directory at C:\Users\acer\AppData\Local\Temp\blockmgr-74cee74c-ebde-4343-a5b0-ed2306713060
17/12/03 17:21:11 INFO MemoryStore: MemoryStore started with capacity 1990.8 MB
17/12/03 17:21:11 INFO SparkEnv: Registering OutputCommitCoordinator
17/12/03 17:21:12 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/12/03 17:21:12 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.25.1:4040
17/12/03 17:21:12 INFO Executor: Starting executor ID driver on host localhost
17/12/03 17:21:12 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 4499.
17/12/03 17:21:12 INFO NettyBlockTransferService: Server created on 192.168.25.1:4499
17/12/03 17:21:12 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.25.1, 4499)
17/12/03 17:21:12 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.25.1:4499 with 1990.8 MB RAM, BlockManagerId(driver, 192.168.25.1, 4499)
17/12/03 17:21:12 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.25.1, 4499)
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-79821116-0 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-79821116-2 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-79821116-1 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-e4b47ad4-0 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-e4b47ad4-1 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-dc36a44a-0 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-e4b47ad4-2 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-dc36a44a-2 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873376-91153e23-1 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873376-91153e23-0 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873376-91153e23-2 for topic kafka_spark
17/12/03 17:21:14 WARN RangeAssignor: No broker partitions consumed by consumer thread kafka_spark_consumer_gouyadong-1512292873373-dc36a44a-1 for topic kafka_spark
-------------------------------------------
Time: 1512292875000 ms
-------------------------------------------

-------------------------------------------
Time: 1512292880000 ms
-------------------------------------------
  • 发送消息到topic:

image.png

  • 结果如下:

image.png

注意:如果使用下面这种同时开启多个线程的模式,那么setMaster("local[7]")中的数量要比开启的线程数量大,否则接收不到来自topic中的消息。

总结:通过这种方式实现,刚开始的时候系统正常运行,没有发现问题,但是如果系统异常重新启动SparkStreaming程序后,发现程序会重复发送已经处理过的数据,这种基于receiver的方式,是使用Kafka的高阶API来做zookeeper中保存消费过的offset的。这是消费kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和zookeeper之间可能是不同步的。官方现在已经推荐使用这种整合方式,推荐我们使用下面的第二种方式:KafkaUtils.createDirectStream()方式。

5.2、KafkaUtils.createDirectDstream方式

不同于Receiver接收数据,这种方式定期地从Kafka中的topic下对应的partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,Spark通过调用kafka简单的消费者API读取一定范围的数据。

相比基于Receiver方式有几个优点:

A、简化并行

      不需要创建多个kafka输入流,然后union它们,SparkStreaming将会创建和kafka分区的一样的RDD的分区数,而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的分区数据是一一对应的关系。

B、高效

      第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,第一次是被kafka复制,另一次是写到WAL中。而没有receiver的这种方式消除了这个问题。

C、恰好一次语义(Exactly-once-semantics)

      Receiver读取kafka数据是通过kafka高层次API把偏移量写入zookeeper中,虽然这种方法可以通过数据保存在WAL中保证数据不丢失,但是可能会因为SparkStreaming和zk中保存的偏移量不一致而导致数据被消费了多次。EOS通过实现kafka低层次API,偏移量仅仅被ssc(StreamingContext)保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具。

5.2.1、KafkaUtils.createDirectStream实现

(1)、前面的步骤和KafkaUtils.createDstream方式一样,接口开始编写代码

  • SparkStreamingKafka_direct类:

import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * 利用sparkStreaming接收kafka中的消息,采用的是低层次API--direct
  *
  * @author:admin
  *
  */
object SparkStreamingKafka_direct {
  def updateFunc(a:Seq[Int],b:Option[Int]) :Option[Int] = {
    Some(a.sum+b.getOrElse(0))
  }

  def main(args: Array[String]): Unit = {
    //todo:1、构建SparkConf
    val sparkConf = new SparkConf().setAppName("SparkStreamingKafka_direct").setMaster("local[6]")
    //todo:2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    sc.setCheckpointDir("./kafka-direct")
    //todo:3、构建StreamingContext
    val ssc = new StreamingContext(sc,Seconds(5))
    //todo:4、kafka的参数配置
    val kafkaParams=Map("metadata.broker.list"->"node7:9092,node8:9092,node9:9092")
    //todo:5、定义给topics,是一个集合,可以存放多个topic
    val topics=Set("kafka_spark")
    //todo:6、利用KafkaUtils.createDirectStream构建DStream
    val kafkaTopicDS = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
    //todo:7、获取kafka中的topic数据
    val kafkaData = kafkaTopicDS.map(x=>x._2)
    //todo:8、切分每一行,每个单词记为1
    val wordAndOne = kafkaData.flatMap(_.split(" ")).map((_,1))
    //todo:9、相同单词次数累加
    val result = wordAndOne.updateStateByKey(updateFunc)
    //todo:10、打印
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
  • 运行结果:

image.png

image.png



除特别注明外,本站所有文章均为东哥技术分享原创,转载请注明出处来自http://quliming.com/post/12.html

评论回复