編譯:江宇,阿里雲EMR技術專家。從事Hadoop內核開發,目前專注於機器學習、深度學習大數據平臺的建設。
簡介:
在機器學習領域,Apache Spark 由於其支持 SQL 類型的操作以及高效的數據處理,被廣泛的用於數據預處理流程,同時 TensorFlow 作為廣受歡迎的深度學習框架被廣泛的用於模型訓練。儘管兩個框架有一些共同支持的數據格式,但是,作為 TFRecord—TensorFlow 的原生格式,並沒有被 Spark 完全支持。儘管之前有過一些嘗試,試圖解決兩個系統之間的差異(比如 Spark-TensorFlow-Connector),但是現有的實現都缺少很多 Spark 支持的重要特性。
本文中,我們將介紹 Spark 的一個新的數據源,Spark-TFRecord。Spark-TFRecord 的目的是提供在Spark中對原生的 TensorFlow 格式進行完全支持。本項目的目的是將TFRecord 作為Spark數據源社區中的第一類公民,類似於 Avro,JSON,Parquet等。Spark-TFRecord 不僅僅提供簡單的功能支持,比如 Data Frame的讀取、寫入,還支持一些高階功能,比如ParititonBy。使用 Spark-TFRecord 將會使數據處理流程與訓練工程完美結合。
LinkedIn 內部 Spark 和 TensorFlow 都被廣泛的使用。Spark 被用於數據處理、訓練數據預處理流程中。Spark 同時也是數據分析的領先工具。隨著原來越多的商業部門使用深度學習模型,TensorFlow 成為了模型訓練和模型服務的主流工具。開源的TensorFlow 模型使用 TFRecord 作為數據格式,而LinkedIn 內部大部分使用 Avro 格式。為了模型訓練,我們或者修改代碼使模型訓練能夠讀取avro格式,或者將avro格式的datasets轉化為TFRecord。Spark-TFRecod主要是解決後者,即將不同格式轉化為TFRecord。
現有的項目和之前的嘗試
在 Spark-TFRecord 項目之前,社區提供 Spark-TensorFlow-Connector , 在 Spark 中讀寫 TFRecord 。Spark-TensorFlow-Connector 是 TensorFlow 生態圈的一部分,並且是由 DataBricks,spark 的創始公司提供。儘管 Spark-TensorFlow-Connector 提供基本的讀寫功能,但是我們在LinkedIn的使用中發現了兩個問題。首先,它基於 RelationProvider 接口。這個接口主要用於Spark 與數據庫連接,磁盤讀寫操作都是由數據庫來支持。然而 Spark-TensorFlow-Connector 的使用場景是磁盤IO,而不是連接數據庫,這塊接口需要開發者自己實現 RelationProvider 來支持IO操作。這就是為什麼Spark-TensorFlow-Connector 大量代碼是用於不同的磁盤讀寫場景。
此外,Spark-TensorFlow-Connector 缺少一些 Spark支持的重要功能,比如 PartitionBy 用於將dataset 根據不同列進行分片。我們發現這個功能在LinkedIn 中對於模型訓練非常重要,提供訓練過程中根據實體IDs進行切分進行分佈式訓練。這個功能在TensorFlow 社區中也是高需求。
Spark-TFRrecord 為了解決上述問題,實現了FileFormat 接口,其他的原生格式比如 Avro,Parquet 等也實現了該接口。使用該接口後,TFRecord 就獲取了所有的 DataFrame 和 DataSet 的I/O API,包括之前說的 PartitionBy 功能。此外,之後的 Spark I/O 接口的功能增強也能夠自動獲取到。
設計
我們起初考慮對 Spark-TensorFlow-Connector 打補丁的方式去獲取 PartitionBy 功能。檢查過源碼後,我們發現 Spark-TensorFlow-Connector 使用的RelationProvider接口,是用於連接 Spark 與 SQL 數據庫的,不適用於 TensorFlow 場景。然後並沒有一個簡單解決方式去解決 RelationProvider 並不提供磁盤I/O操作這一問題。於是,我們決定採取了不同的方式,我們實現了FileFormat,FileFormat是用來實現底層的基於文件的I/O操作。實現這一功能對LinkedIn的場景是非常有用的,我們的datasets基本上都是直接讀寫磁盤。
下圖展示了各個模塊
每個模塊作用如下:
Schema Inferencer: 用於將Spark的數據類型推測為TFRecord的數據類型,我們複用了很多Spark-Tensorflow-Connector功能。
TFRecord Reader: 讀取磁盤中TFRecord文件並使用反序列化器將TFRecord轉換為Spark的InternalRow數據結構。
TFRecord Writer:將Spark的InternalRow數據結構通過序列化器轉化為TFRecord格式並保存至磁盤。我們使用TensorFlow Hadoop庫的寫入器。
TFRecord Deserializer: 反序列化器,將TFRecord轉化為Spark InternalRow。
TFRecord Serializer: 序列化器,將Spark InternalRow轉化為TFRecord。
如何使用Spark-TFRecord
Spark-TFRecord與Spark-TensorFlow-Connector完全後向兼容。遷移十分方便,只需要加入spark-tfrecord jar包並且指定數據格式為“tfrecord”。下面的例子顯示瞭如何使用Spark-TFRecord去讀取傾斜和partition TFRecord文件。更多的例子可以參照github倉庫。
// launch spark-shell with the following command:
// SPARK_HOME/bin/spark-shell --jar target/spark-tfrecord_2.11-0.1.jar
import org.apache.spark.sql.SaveMode
val df = Seq((8, "bat"),(8, "abc"), (1, "xyz"), (2, "aaa")).toDF("number", "word")
df.show
// scala> df.show
// +------+----+
// |number|word|
// +------+----+
// | 8| bat|
// | 8| abc|
// | 1| xyz|
// | 2| aaa|
// +------+----+
val tf_output_dir = "/tmp/tfrecord-test"
// dump the tfrecords to files.
df.repartition(3, col("number")).write.mode(SaveMode.Overwrite).partitionBy("number").format("tfrecord").option("recordType", "Example").save(tf_output_dir)
// ls /tmp/tfrecord-test
// _SUCCESS number=1 number=2 number=8
// read back the tfrecords from files.
val new_df = spark.read.format("tfrecord").option("recordType", "Example").load(tf_output_dir)
new_df.show
// scala> new_df.show
// +----+------+
// |word|number|
// +----+------+
// | bat| 8|
// | abc| 8|
// | xyz| 1|
// | aaa| 2|
總結
Spark-TFRecord使得Record可以作為Spark 數據格式的一等公民與其他數據格式一起使用。包含了所有dataframe API的功能,比如讀、寫、分區等。目前我們僅限於schemas符合Spark-Tensorflow-Connector要求。未來的工作將會提供更復雜的schemas支持。
原文鏈接:
https://engineering.linkedin.com/blog/2020/spark-tfrecord
阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。
Apache Spark技術交流社區公眾號,微信掃一掃關注