大數據

深入剖析 Delta Lake:Schema Enforcement & Evolution

編譯:辰山,阿里巴巴計算平臺事業部 EMR 高級開發工程師,目前從事大數據存儲方面的開發和優化工作


在實踐經驗中,我們知道數據總是在不斷演變和增長,我們對於這個世界的心智模型必須要適應新的數據,甚至要應對我們從前未知的知識維度。表的 schema 其實和這種心智模型並沒什麼不同,需要定義如何對新的信息進行分類和處理。

這就涉及到 schema 管理的問題,隨著業務問題和需求的不斷演進,數據結構也會不斷髮生變化。通過 Delta Lake,能夠很容易包含數據變化所帶來的新的維度,用戶能夠通過簡單的語義來控制表的 schema。相關工具主要包括 Schema 約束(Schema Enforcement)和 Schema 演變(Schema Evolution),前者用以防止用戶髒數據意外汙染表,後者用以自動添加適當的新數據列。本文將詳細剖析這兩個工具。

理解表的 Schemas

Apache Spark 的每一個 DataFrame 都包含一個 schema,用來定義數據的形態,例如數據類型、列信息以及元數據。在 Delta Lake 中,表的 schema 通過 JSON 格式存儲在事務日誌中。

什麼是 Schema 約束?

Schema 約束(Schema Enforcement),也可稱作 Schema Validation,是 Delta Lake 中的一種保護機制,通過拒絕不符合表 schema 的寫入請求來保證數據質量。類似於一個繁忙的餐廳前臺只接受預定坐席的顧客,這個機制會檢查插入表格的每一列是否符合期望的列(換句話說,就是檢查每個列是否已經“預定坐席”),那些不在期望名單上的寫入將被拒絕。

Schema 約束如何工作?

Delta Lake 對寫入進行 schema 校驗,也就是說所有表格的寫入操作都會用表的 schema 做兼容性檢查。如果 schema 不兼容,Delta Lake 將會撤銷這次事務(沒有任何數據寫入),並且返回相應的異常信息告知用戶。

Delta Lake 通過以下準則判斷一次寫入是否兼容,即對寫入的 DataFrame 必須滿足:

• 不能包含目標表 schema 中不存在的列。相反,如果寫入的數據沒有包含所有的列是被允許的,這些空缺的列將會被賦值為 null。

• 不能包含與目標表類型不同的列。如果目標表包含 String 類型的數據,但 DataFrame 中對應列的數據類型為 Integer,Schema 約束將會返回異常,防止該次寫入生效。

• 不能包含只通過大小寫區分的列名。這意味著不能在一張表中同時定義諸如“Foo”和“foo”的列。不同於 Spark 可以支持大小寫敏感和不敏感(默認為大小寫不敏感)兩種不同的模式,Delta Lake 保留大小寫,但在 schema 存儲上大小寫不敏感。Parquet 在存儲和返回列信息上面是大小寫敏感的,因此為了防止潛在的錯誤、數據汙染和丟失的問題,Delta Lake 引入了這個限制。

以下代碼展示了一次寫入過程,當添加一次新計算的列到 Delta Lake 表中。

# Generate a DataFrame of loans that we'll append to our Delta Lake table
loans = sql("""
            SELECT addr_state, CAST(rand(10)*count as bigint) AS count,
            CAST(rand(10) * 10000 * count AS double) AS amount
            FROM loan_by_state_delta
            """)

# Show original DataFrame's schema
original_loans.printSchema()
 
"""
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
"""
 
# Show new DataFrame's schema
loans.printSchema()
 
"""
root
  |-- addr_state: string (nullable = true)
  |-- count: integer (nullable = true)
  |-- amount: double (nullable = true) # new column
"""
 
# Attempt to append new DataFrame (with new column) to existing table
loans.write.format("delta") \
           .mode("append") \
           .save(DELTALAKE_PATH)

""" Returns:

A schema mismatch detected when writing to the Delta table.
 
To enable schema migration, please set:
'.option("mergeSchema", "true")\'
 
Table schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
 
 
Data schema:
root
-- addr_state: string (nullable = true)
-- count: long (nullable = true)
-- amount: double (nullable = true)
 
If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE command for changing the schema.

不同於自動添加新的列,Delta Lake 受到 schema 約束並阻止了這次寫入生效。並且為了幫助定位是哪個列造成了不匹配,Spark 會在錯誤棧中打印出兩者的 schema 作為對照。

Schema 約束有何作用?

由於 Schema 約束是一種嚴格的校驗,因此可以用於已清洗、轉化完成的數據,保證數據不受汙染,可用於生產或者消費。典型的應用場景包括直接用於以下用途的表:

• 機器學習算法

• BI 儀表盤

• 數據分析和可視化工具

• 任何要求高度結構化、強類型、語義 schema 的生產系統

為了準備好最終的數據,很多用戶使用簡單的“多跳”架構來逐步往表中添加結構。更多相關內容可以參考 Productionizing Machine Learning With Delta Lake.

當然,Schema 約束可以用在整個工作流程的任意地方,不過需要注意的是,有可能因為諸如不經意對寫入數據添加了某個列,導致寫入流失敗的情況。

防止數據稀釋

看到這,你可能會問,到底需不需要大費周章做 Schema 約束?畢竟,有時候一個意料之外的 schema 不匹配問題反而會影響整個工作流,特別是當新手使用 Delta Lake。為什麼不直接讓 schema 接受改變,這樣我們就能任意寫入 DataFrame 了。

俗話說,防患於未然,有些時候,如果不對 schema 進行強制約束,數據類型兼容性的問題將會很容易出現,看上去同質的數據源可能包含了邊緣情況、汙染列、錯誤變換的映射以及其他可怕的情況都可能會一夜之間汙染了原始的表。所以更好的做法應該從根本上阻止這樣的情況發生,通過 Schema 約束就能夠做到,將這類錯誤顯式地返回進行恰當的處理,而不是讓它潛伏在數據中,看似寫入時非常順利,但埋下了無法預知的隱患。

Schema 約束能夠確保表 schema 不會發生改變,除非你確切地執行了更改操作。它能有效的防止“數據稀釋”——當新的列頻繁添加,原本簡潔的表結構可能因為數據氾濫而失去原有的含義和用處。Schema 約束的設計初衷就是通過設定嚴格的要求來保證質量,確保表數據不受汙染。

另一方面,假如經過再三確認之後,確定的確需要添加新的列,那解決方法也非常簡單,也就是下文即將介紹的 Schema 演變!

什麼是 Schema 演變

Schema 演變(Schema Evolution)允許用戶能夠方便地修改表的當前 schema,來適應不斷變化的數據。最常見的用法就是在執行添加和覆蓋操作時,自動地添加一個或多個列來適應 schema。

Schema 演變如何工作?

繼續沿用上文的例子,對於之前由於 schema 不匹配導致請求被拒絕的情況,開發人員可以方便地使用 Schema 演變來添加新的列。Schema 演變的使用方式是在 .write 或 .writeStream 的 Spark 命令後面添加上 .option('mergeSchema', 'true')。

# Add the mergeSchema option
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)

可以執行以下 Spark SQL 語句來察看圖表。

# Create a plot with the new column to confirm the write was successful
%sql
SELECT addr_state, sum(`amount`) AS amount
FROM loan_by_state_delta
GROUP BY addr_state
ORDER BY sum(`amount`)
DESC LIMIT 10

當然,也可以選擇通過添加 spark.databricks.delta.schema.autoMerge = True 到 Spark 配置文件中使得該選項對整個 Spark session 生效。不過需要注意的是,這樣使用的話, Schema 約束將不再會對 schema 不匹配問題進行報警提示。

通過指定 mergeSchema 選項,所有在輸入 DataFrame 中存在但在目標表中不存在的列都將被作為該事務操作的一部分添加到 schema 末尾。也允許添加嵌套字段,這些字段將被添加到對應列的末尾。

數據科學家可以利用這個選項來添加新的列(例如一個新增的跟蹤指標,或是這個月的銷售數據)到已有的機器學習表中,而不必廢棄現有依賴於舊的列信息的模型。

以下對錶的添加和覆蓋操作都是合法的 Schema 演變的操作:

• 添加新列(這是最常用的場景)

• 修改數據類型,Null->其他類型,或者向上類型轉換 Byte->Short->Integer

其他改動都是非法的 Schema 演變操作,需要通過添加 .option("overwriteSchema", "true") 選項來覆蓋 schema 以及數據。舉個例子,表原本包含一個類型為 integer 的列“Foo”,而新的 schema 需要改成 string 類型,那麼所有的 Parquet 數據文件都需要覆蓋重寫。包括以下步驟:

• 刪除這個列

• 修改列的數據類型

• 修改列名,僅用大小寫區分(例如“Foo”和“foo”)
最後,在 Spark 3.0 中,支持了顯式 DDL(通過 ALTER TABLE 方式),允許用戶能夠對 schema 執行以下操作:

• 添加列

• 修改列註釋

• 設置表的屬性來定義表的行為,例如設置事務日誌的保留時間

Schema 演變有何作用?

Schema 演變可以用來顯式地修改表的 schema(而不是意外添加了並不想要的列)。這提供了一種簡單的方式來遷移 schema,因為它能自動添加上正確的列名和數據類型,而不需要進行顯式的定義。

總結

Schema 約束能夠拒絕與表不兼容的任何的新的列或者 schema 的改動。通過設置嚴格的限制,數據工程師們可以完全信任他們的數據,從而能夠作出更好的商業決策。

另一方面,schema 演變則對 schema 約束進行了補充,使得一些期望的 schema 變更能夠自動地生效。畢竟,添加一個新的列本就不應該是一件困難的事情。

Schema 約束和 Schema 演變相互補益,合理地結合起來使用將能方便地管理好數據,避免髒數據侵染,保證數據的完整可靠。

原文鏈接:https://databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html


相關閱讀推薦:
Delta Lake,讓你從複雜的Lambda架構中解放出來
【譯】Databricks使用Spark Streaming和Delta Lake對流式數據進行數據質量監控介紹
【譯】Delta Lake 0.5.0介紹
Delta Lake - 數據湖的數據可靠性


阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!image.png
對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。image.png
Apache Spark技術交流社區公眾號,微信掃一掃關注image.png

Leave a Reply

Your email address will not be published. Required fields are marked *