作者:李呈祥,花名司麟 ,阿里雲智能EMR團隊高級技術專家,Apache Hive Committer, Apache Flink Committer,目前主要專注於EMR產品中開源計算引擎的優化工作。
Spark社區在Spark Packages網站中索引了許多第三方庫,這些第三方庫由不同的開發者貢獻,作為Spark生態圈的一部分,擴充了Spark的使用範圍和使用場景,其中很多對於我們日常的使用可能有幫助,我們準備開啟一個系列文章介紹Spark Packages中一些有意思的第三方庫,作為系列的第一篇,本文主要介紹Optimus,一個基於PySpark的簡單易用的數據準備工具。
本文的部分內容源自Optimus官網和相關介紹文章,原文鏈接參考文末引用部分。
在Spark(Pyspark)的支持下,Optimus允許用戶使用自己的或一組預先創建的數據轉換功能來清理數據,對其進行概要分析並應用與數據分析和機器學習等場景,可以輕鬆地利用python語言進行所有這些操作。Optimus主要關注與以下幾個方面:
- 創建一個可靠的API來訪問和處理數據。
- 讓用戶輕鬆地從Pandas遷移。
- 使數據探索更加容易。
創建一個可靠的API來訪問和處理數據
首先,我們來看看Optimus的基本使用方式
from pyspark.sql import SparkSession
from optimus import Optimus
// 創建context
spark = SparkSession.builder.appName('optimus').getOrCreate()
op= Optimus(spark)
// 加載數據
df = op.load.csv("../examples/data/foo.csv")
// 執行清理
new_df = df\
.rows.sort("rank","desc")\
.withColumn('new_age', df.age)\
.cols.lower(["names","function"])\
.cols.date_transform("date arrival", "yyyy/MM/dd", "dd-MM-YYYY")\
.cols.years_between("date arrival", "dd-MM-YYYY", output_cols = "from arrival")\
.cols.remove_accents("names")\
.cols.remove_special_chars("names")\
.rows.drop(df["rank"]>8)\
.cols.rename(str.lower)\
.cols.trim("*")\
.cols.unnest("japanese name", output_cols="other names")\
.cols.unnest("last position seen",separator=",", output_cols="pos")\
.cols.drop(["last position seen", "japanese name","date arrival", "cybertronian", "nulltype"])
//保存結果
new_df.save.csv("data/foo.csv")
Optimus基本PySpark框架,重新組織了對數據進行清理/準備的相關API,把數據處理整理為針對dataframe.rows和dataframe.cols兩類操作,基於rows和cols實現了非常豐富的針對數據清理和準備相關的接口,用戶可以使用這些接口非常方便高效地完成相關工作。在Optimus中,核心的數據操作可以歸納為如下幾類:
- 創建一個DataFrame
- 用append()追加行或列
- 使用select()選擇行或列
- 使用apply()更新或轉換列數據
- 使用drop()刪除行或列
- 使用read()加載數據
- 使用write()保存數據
針對列的操作
對於數據集的操作主要是針對列進行的,所以這裡主要介紹一些典型的針對列的操作類型:
Aggregation
Optimus擴展了PySpark的操作,創建了一種更簡單的方式來對數據集進行統計。
print(df.cols.min(“ height”))
print(df.cols.percentile(['height','rank'],[0.05,0.25,0.5,0.75,0.95]))
print(df.cols.max(“ height”))
print(df.cols.median([“ height”,“ rank”]))
print(df.cols.range([“ height”,“ rank”])))
print(df.cols.std([“ height”,“ rank”]))17.5
{'height':{0.05:17.5,0.25:17.5,0.5:26.0,0.75:28.0,0.95:28.0},'rank':{0.05:7.0,0.25:7.0,0.5:7.0,0.75:10.0,0.95 :10.0}}
28.0
{'height':26.0,'rank':7.0}
{'height':{'min':17.5,'max':28.0},'rank':{'min':7,'max ':10}}
{'height':{'stddev':5.575242894559244},'rank':{'stddev':1.7320508075688772}}
Transformation and Chaining
類似PySpark DataFrame的操作,Optimus的數據轉換操作也可以鏈接起來,甚至還可以和PySpark的DataFrame操作鏈接起來。利用Spark的延遲計算的特性(在示例中show()才會觸發計算),使用Catalyst優化執行計劃。
df = df\
.rows.sort(["rank","height"])\
.cols.lower(["names","function"])\
.cols.remove_accents("names")\
.cols.remove_special_chars("names")\
.cols.trim("names")\
.show()
+---------+------+---------+----+
| names|height| function|rank|
+---------+------+---------+----+
| optimus| 28.0| leader| 10|
| ironhide| 26.0| security| 7|
|bumblebee| 17.5|espionage| 7|
+---------+------+---------+----+
Nest/Unnest
使用nest和unnest操作可以將多列合併成一個新列或者將一列拆分為多列,如下所示:
df.cols.nest(["names", "function"], output_col = "new_col", shape ="string").show()
+---------+------+---------+----+-------------------+
| names|height| function|rank| new_col|
+---------+------+---------+----+-------------------+
| optimus| 28.0| leader| 10| optimus leader|
| ironhide| 26.0| security| 7| ironhide security|
|bumblebee| 17.5|espionage| 7|bumblebee espionage|
+---------+------+---------+----+-------------------+
df.cols.unnest("new_col", " ").cols.drop("new_col")
+---------+------+---------+----+---------+---------+
| names|height| function|rank|new_col_0|new_col_1|
+---------+------+---------+----+---------+---------+
| optimus| 28.0| leader| 10| optimus| leader|
| ironhide| 26.0| security| 7| ironhide| security|
|bumblebee| 17.5|espionage| 7|bumblebee|espionage|
+---------+------+---------+----+---------+---------+
自定義轉換
Optimus具有兩個函數apply()和apply_expr(),用戶可以在其中實現函數(UDF或Pandas UDF)或列表達式。
from pyspark.sql import functions as F
def func(value, args):
return value + 1
df.cols.apply("height", func, "int")\
.cols.apply_expr("rank", F.col("rank")+1)\
.show()
+---------+------+---------+----+
| names|height| function|rank|
+---------+------+---------+----+
| optimus| 29| leader| 11|
| ironhide| 27| security| 8|
|bumblebee| 18|espionage| 8|
+---------+------+---------+----+
讓用戶輕鬆地從Pandas遷移
在數據分析領域,Python是通用語言,而Pandas是最常用的庫,所以Optimus在設計時儘量和Pandas的接口保持一致,以下是Optimus和Pandas以及PySpark的接口對比:
Description | Pandas | Spark | Optimus |
---|---|---|---|
Read csv file | pd.read_csv() | spark.read.csv() | op.read.csv() |
Create Dataframe | pd.Dataframe | df.createdataframe() | op.create.df() |
Append Row | df.append | df.union() | df.row().append() |
Column Mean | df.mean | df1.agg({"x": "max"}) | df.cols().mean() |
Show Rows from Dataframe | df.head() | df.show() | df.show() |
Drop Columns | df.drop() | df.drop() | df.cols().drop() |
Sum all values in a Column | df.sum() | df1.agg({"x": "sum"}) function | df.cols().sum() |
Save Dataframe to csv | df.to_csv() | df.write.csv() | df.save().csv() |
Get a value by index | df.get() | NA | NA |
Get the mode of a column | df.mode() | NI | df.cols().mode() |
Cast a Column | df.astype() | df.column.cast() | df.cols().cast(), astype() as alias |
Substract 2 dataframes | df.sub() | NI | NI |
Merge to dataframes | pd.concat() | df.union() | optimus.concat() |
Apply a user defined fucntion to a column | df.apply(func) | fn = F.udf(labmbda x:x+1, DoubleType()) df.withColumn('disp1', n(df.disp)) | df.cols().apply(func) |
Group rows | df.groupby() | df.groupby() | df.groupby() |
Joint operation between to dataframes | df.join() | df.join() | df.join() |
Fill Null values with x | df.fillna() | df.fillna() | df.fillna() |
Get the max number of a Column | df.max() | df1.agg({"x": "max"}) | df.cols().max() |
Reset index | reset_index() | NA | NA |
NI= Not implemented
NA= Not
除了在Spark無法實現的功能(如reset_index),Optimus實現了幾乎所有Pandas可應用於Spark的功能,而且兩個接口基本一致,大大方便了Pandas用戶的遷移。
使數據探索更輕鬆
Optimus具有功能強大的內置數據探查器,除所有基本操作外,它還提供了獨特的數據探查功能。用戶可以查看特定列中存在多少種數據類型。例如,有一百萬行的顏色值為白色,黑色,紅色以及數百種顏色,如何確定一百萬行中沒有“ 1”數字?數據探查使得用戶可以瞭解數據集的質量,是否有髒數據,為數據清理和準備提供前提信息和驗證方式,使數據集以合適的狀態用於後續的數據分析和ML/DL處理。
df = op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/Meteorite_Landings.csv").h_repartition()
// 對name列進行profile
op.profiler.run(df, "name", infer=False)
可以看到profile提供了詳細的數據統計信息,包括name列的數據類型,null數量,distinct count數量,topN出現次數,histogram分佈等等,基於這些信息,用戶可以準確瞭解數據的質量。
Profile由於計算量比較大,可能會比較耗時,特別是對於distinct count這種操作,使用relative_error 和 approx_count參數可以以降低精度為代價加速profile速度。
op.profiler.run(df, "name", infer=False, relative_error =1, approx_count=True)
總結
本文主要介紹了Optimus項目,作為一個Spark的第三方庫,Optimus基於PySpark,為用戶提供了一套完整的數據質量探查和數據清理工具集,接口參考Pandas設計,易用且強大,非常適合大規模數據的清理準備工作。限於篇幅,還有很多Optimus的清理接口和Profile功能沒有介紹,感興趣的同學可以訪問Optimus官網探索更多功能和用法。
引用
- https://hi-optimus.com/
- https://github.com/ironmussa/Optimus
- http://docs.hioptimus.com/en/latest/sections/overview.html
- https://towardsdatascience.com/announcing-optimus-v2-agile-data-science-workflows-made-easy-c127a12d9e13
阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。
Apache Spark技術交流社區公眾號,微信掃一掃關注