在利用Spark處理數據時,如果數據量不大,那麼Spark的默認配置基本就能滿足實際的業務場景。但是當數據量大的時候,就需要做一定的參數配置調整和優化,以保證業務的安全、穩定的運行。並且在實際優化中,要考慮不同的場景,採取不同的優化策略。
1.合理設置微批處理時間
在SparkSreaming流式處理中,合理的設置微批處理時間(batchDuration)是非常有必要的。
如果batchDuration設置過短,會導致SparkStreaming頻繁提交job。如果每個batchDuration所產生的job不能在這個時間內完成處理,就會造成job不斷堆積,最終導致SparkStreaming發生阻塞,甚至程序宕掉。
需要根據不同的應用場景和硬件配置等確定,可以根據SparkStreaming的可視化監控界面,觀察Total Delay等指標來進行batchDuration的調整。
2.控制消費的最大速率
比如SparkStreaming和Kafka集成,採用direct模式時,需要設置參數spark.streaming.kafka.maxRatePerPartition以控制每個Kafka分區最大消費數。該參數默認沒有上線,即Kafka當中有多少數據它就會直接全部拉出。
但在實際使用中,需要根據生產者寫入Kafka的速率以及消費者本身處理數據的速度綜合考慮。
同時還需要結合上面的batchDuration,使得每個partition拉取的數據,要在每個batchDuration期間順利處理完畢,做到儘可能高的吞吐量,該參數的調整需參考可視化監控界面中的Input Rate和Processing Time。
3.緩存反覆使用的"數據集"
Spark中的RDD和SparkStreaming中的DStream,如果被反覆的使用,最好利用cache或者persist算子,將"數據集"緩存起來,防止過度的調度資源造成的不必要的開銷。
4.合理的設置GC
JVM垃圾回收是非常消耗性能和時間的,尤其是stop world、full gc非常影響程序的正常運行。
關於JVM和參數配置,建議研讀《JVM內存管理和垃圾回收》、《JVM垃圾回收器、內存分配與回收策略》、《內存洩漏、內存溢出和堆外內存,JVM優化配置參數》。
5.合理設置CPU
每個executor可以佔用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況。
要避免CPU的使用浪費,比如一個executor佔用多個core,但是總的CPU利用率卻不高。此時建議讓每個executor佔用相對之前較少的core,同時worker下面增加更多的executor進程來增加並行執行的executor數量,從而提高CPU利用率。同時要考慮內存消耗,畢竟一臺機器運行的executor越多,每個executor的內存就越小,容易產生OOM。
6.使用Kryo進行序列化和反序列化
Spark默認使用Java的序列化機制,但這種Java原生的序列化機制性能卻比Kryo差很多。使用Kryo需要進行設置:
//設置序列化器為KryoSerializer
SparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
//註冊要序列化的自定義類型
SparkConf.registerKryoClasses(Array(classOf[CustomClass1],classOf[CustomClass2]))
7.使用高性能的算子
1)使用reduceByKey、aggregateByKey替代groupByKey
2)filter之後進行coalesce操作
3)使用repartitionAndSortWithinPartition
替代repartition與sort操作
4)使用mapPartition替代map
5)使用foreachPartition替代foreach
要結合實際使用場景,進行算子的替代優化。
除了上述常用調優策略,還有合理設置Spark並行度,比如參數spark.default.parallelism的設置等,所有這些都要求對Spark內核原理有深入理解,這裡不再一一闡述。
阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。
Apache Spark技術交流社區公眾號,微信掃一掃關注