大數據

Spark為什麼只有在調用action時才會觸發任務執行呢(附算子優化和使用示例)?

Spark算子主要劃分為兩類:transformation和action,並且只有action算子觸發的時候才會真正執行任務。還記得之前的文章《Spark RDD詳解》中提到,Spark RDD的緩存和checkpoint是懶加載操作,只有action觸發的時候才會真正執行,其實不僅是Spark RDD,在Spark其他組件如SparkStreaming中也是如此,這是Spark的一個特性之一。像我們常用的算子map、flatMap、filter都是transformation算子,而collect、count、saveAsTextFile、countByKey、foreach則為action算子。
微信圖片_20200709201425.jpg

1.導致map執行完了要立即輸出,數據也必然要落地(內存和磁盤)

2.map任務的生成、調度、執行,以及彼此之間的rpc通信等等,當牽扯到大量任務、大數據量時,會很影響性能

看到這兩點是不是很容易聯想到MapReduce的計算模型,MapReduce因為中間結果需要落地,導致性能相對Spark較低下,這也是MapReduce廣為詬病的原因之一。所以Spark採用只有調用action算子時才會真正執行任務,這是相對於MapReduce的優化點之一。

但是每個Spark RDD中連續調用多個map類算子,Spark任務是對數據在一次循環遍歷中完成還是每個map算子都進行一次循環遍歷呢?

答案很確定:不需要對每個map算子都進行循環遍歷。Spark會將多個map算子pipeline起來應用到RDD分區的每個數據元素上(後續將要介紹的SparkSQL中的Dataset/DataFrame也是如此)

下面說幾個算子的優化,這也是面試中經常問的問題:

在我們實際的業務場景中經常會使用到根據key進行分組聚合的操作,當然熟悉Spark算子使用的都知道像reduceByKey、groupByKey、aggregateByKey、combineByKey大多都能滿足需求。但是筆者在這裡還是要重點說一下,因為很多人想到分組聚合往往第一個想到的算子就是groupByKey,但是groupByKey相對其他算子性能低並且處理不好的情況下,容易發生數據傾斜。所以我們能用其他算子比如reduceByKey替代groupByKey實現滿足我們業務需求的,就一律不用groupByKey。當然reduceByKey在某些場景下性能會比aggregateByKey低,具體算子的替換要結合實際業務需求場景來定。

這裡主要說明一下reduceByKey和groupByKey的對比,以及幾個算子替代的場景示例:
1.首先這幾個“ByKey”的算子會觸發shullfe,這裡強調一點,對於分佈式任務,如果存在聚合操作的話往往都是要進行shuffle的

2.相對於reduceByKey,groupByKey沒有預先聚合,而是直接將相同key的value進行分組然後再聚合造成shuffle耗費嚴重;而reduceByKey會先在map端進行局部聚合,然後再在reduce端再次聚合,這點類似於MapReduce中combiner組件,可以減少磁盤IO和網絡IO,提高性能

3.aggregateByKey替代reduceByKey的場景:當輸出的結果和輸入的結果不同的時候可以被替換。例如,查找同一個key的所有不同的value值,也即是先根據key進行分組,然後去重。假設採用reduceByKey實現的話,需要先用map講單個元素裝到set裡,然後在針對set進行reduceByKey,偽代碼:rdd.map(case(k,v) => (k, Set(v))).reduceByKey(_ ++ _),但是該過程會導致為每個記錄創建一個set,這是很沒必要的。此時我們可以使用aggregateByKey替代reduceByKey實現該需求,偽代碼:

val zero = mutable.Set[String]()
rdd.aggregateByKey(zero)((set, v) => set += v,(set1, set2) => set1 ++= set2)

具體示例:
1)reduceByKey

val rdd = rowRdd.map { row =>

  val id = row.getAs[String]("id")
  val name = row.getAs[String]("name")
  val count = row.getAs[Long]("count")
  (id, (name, count))
}.map { case (id, (name, count)) => (id, Array(count)) }.reduceByKey(_ ++ _)

2)aggregateByKey

val zeroValue = mutable.Set[(String, Long)]()
val rdd = df.rdd.map { row =>

  val id = row.getAs[String]("id")
  val name = row.getAs[String]("name")
  val count = row.getAs[Long]("count")
  (id, (name, count))

}.aggregateByKey(zeroValue)(

  (set, v) => set += v,
  (set1, set2) => set1 ++= set2)

3)combineByKey

val rdd = df.rdd.map { row =>

  val id = row.getAs[String]("id")
  val name = row.getAs[String]("name")
  val count = row.getAs[Long]("count")
  (id, (name, count))

}.combineByKey(

  (v: (String, Long)) => List(v),
  (c: List[(String, Long)], v: (String, Long)) => v :: c,
  (c1: List[(String, Long)], c2: List[(String, Long)]) => c1 ::: c2)

4.當兩個數據集已經按照key進行分組,此時想對兩個數據集在仍然保持分組的基礎上進行join,則可以使用cgroup,以避免分組展開然後再次分組的開銷

Spark目前提供了80多種算子,想熟練掌握這些算子如何運用,筆者建議學習一下Scala語言,原因除了《Spark通識》中說的那兩點之外,還有就是Spark提供的很多算子跟Scala本身提供的函數功能很相似甚至有些名字都是一樣的,瞭解了Scala提供的,對於學習Spark算子將事半功倍。這裡舉一些常用的transformation和action使用示例:

transformation

map
map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。
舉例:
val a = sc.parallelize(1 to 9, 3)
val b = a.map(x => x*2)
a.collect 【Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)】
b.collect 【Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)】

filter
filter是對RDD中的每個元素都執行一個指定的函數來過濾產生一個新的RDD,該RDD由經過函數處理後返回值為true的輸入元素組成。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。
val rdd = sc.parallelize(List(1,2,3,4,5,6))
val filterRdd = rdd.filter(_ > 3)
filterRdd.collect() 【返回所有大於3的數據的:Array(6,8,10,12)】

flatMap
與map類似,區別是原RDD中的元素經map處理後只能生成一個元素,而原RDD中的元素經flatmap處理後可生成多個元素來構建新RDD。舉例:對原RDD中的每個元素x產生y個元素(從1到y,y為元素x的值)
val a = sc.parallelize(1 to 4, 2)
val b = a.flatMap(x => 1 to x)
b.collect 【Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)】

reduceByKey和sortByKey
分組聚合與排序,這裡以單詞統計,並按單詞排序為例

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合(PairRDDFunctions提供)

val rdd4 = rdd3.reduceByKey(_ + _)

//false降序,默認true(OrderedRDDFunctions提供)

val rdd5 = rdd4.sortByKey(false)

repartition

該函數其實就是coalesce函數第二個參數為true的實現,改變分區數會產生shuffle,repartition之後會返回一個新的RDD
var data = sc.parallelize(1 to 12, 3) //分區數3
var rdd1 = data.repartition(1) //分區數1
var rdd1 = data.repartition(4) //4
data.partitions.size 還是3

action

first

first返回RDD中的第一個元素,不排序。
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1.first 【 (A,1) 】
var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1.first 【 10 】

count

count返回RDD中的元素數量。
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1.count 【 3 】

take

take用於獲取RDD中從0到num-1下標的元素,不排序。
var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1.take(1) 【 Array(10) 】
rdd1.take(2) 【 Array(10, 4) 】

像各種save操作,如saveAsNewAPIHadoopDataset都是action算子,這裡就不一一列舉了。

Leave a Reply

Your email address will not be published. Required fields are marked *