今天早上的课 主要是讲解在 Spark Streaming 中怎么进行 事务处理
Exactly Once 的事務處理
1) 數據零掉失,必需有可靠的數據來源和可靠的 Receiver, 且整個應用程序的 metadata 必需進行 checkpoint, 且通過 WAL 來保証數據的安全性。 2) Spark Streaming 1.3 的時候為了免 WAL 的性能損失和實現 Exactly Once 而提供了 Kafka Direct API, 把 Kafka 作為文件存儲系統,此時兼具有流的優勢和文件系統的優勢,至此,Spark Streaming + Kafka 就構成完美的世界。所有的 Executor 通過 kafka API 直接消費數據。直接管理 Offset 才不會重複消費數據。 第一,數據不用copy 副本。 第二,不需要進行 WAL 備份,不會做成不必要的性能損耗。
第三, Kafka 的比HDFS 高效很多,因為他內存中採用 memory copy 的方式。
數據掉失及其具體的解決方式
1) 在 Receiver 收到數據且通過 Driver 的調度 Executor 開始計算數據的時候,如果 Driver 突然崩潰,到此 Executor 會被 Kill 掉,那麼 Executor 中的數據就會掉失,此時就必需通過 e.g. WAL的方式,讓所有的數據都會通過例如 HDFS 的方式首先進行安全性容錯處理,此時如果 Executor 中的數據掉失話就可以通過 WAL 恢復過來
據重複讀取的情況
在 Receiver 收到數據並且保存到了 HDFS 等持久化引擊但是沒有來得及進行 updateOffsets, 此時 Receiver 崩潰後重新啟動就會通過管理 Kafka 的 ZooKeeper 中元數據,但是此時 Spark Streaming 認為是成功的,但Kafka 認為是失敗的,因為沒有更新 (updateOffsets 到 Zookeeper 中)。此時導致數據重複消費的情況。
性能損失:
1) 通過WAL的方式進行會極大的損傷 Spark Streaming 中 Receiver 接收數據的性能。
2) 如果通過 Kafka 作為數據來源的話, Kafka中有數據, 然後Receiver接收的時候又會有數據副本,這個數據其實的存儲資源的浪費。
關於Spark Streaming 數據輸出多次重寫及其解決方案:
1) 為什麼會有這個問題,因為 Spark Streaming 在計算的時候會基於 Spark Core, Spark Core 天生會做以下事情會導致 Spark Streaming 的結果(部份)重複輸出。task 重試,慢任務推測,stage 重複,job 重測。具體解決方案: 設置 spark.task.maxFailure 次數為 1 設置 spark.speculation 為關閉狀態(因為慢任務推測其實非常消耗性能,所以關閉後可以顥著提高 Spark Streaming 處理性能。Spark Streaming on Kafka 的話,Job 失敗後可以設置 auto.offset.reset 為 largest 的方式;
最後再次強調 可以通過 transform 和 foreachRDD 基於業務邏輯控制來實現數據不重複消費 和輸出不重複!這兩個方法類似於 Spark Streaming 的後門,可以做任意想象的控制操作
Thanks for reading
Janice
——————————————————————————————–
Reference: DT大数据梦工厂SPARK版本定制課程 – 第4课:Spark Streaming的Exactly-One的事务处理和不重复输出彻底掌握