EMR-3.27.x及之前版本使用Flink社區開源版本,EMR-3.27.x之后版本使用完全兼容開源Flink的企業版(VVR)。本文介紹如何配置Flink(VVR)類型的作業。
背景信息
Flink企業版由Apache Flink創始團隊官方出品,擁有全球統一商業化品牌。
VVR提供企業版StateBackend,性能是開源版本的3~5倍。在EMR Hadoop集群中,您可使用VVR引擎和EMR數據開發功能提交作業。VVR支持開源Flink 1.10版本,默認使用商業GeminiStateBackend,具備以下特性:
采用創新的數據結構,提高隨機查詢、降低讀磁盤I/O的性能。
優化Cache策略,內存充足情況下熱數據不落盤,并且Compaction后Cache不會失效。
完全使用Java實現,消除RocksDB的JNI開銷。
使用堆外內存,并基于GeminiDB的特點實現高效的內存分配器,消除JVM GC帶來的影響。
支持異步增量Checkpoint,同步階段只進行內存索引的拷貝,相較于RocksDB可以避免I/O帶來的抖動。
支持Local Recovery和Timer落盤。
如果您想使用GeminiStateBackend,請不要在代碼中指定StateBackend類型。使用GeminiStateBackend啟動時,TM的內存不少于1728 MB。
Flink中Checkpoint和StateBackend的基礎配置同樣適用于GeminiStateBackend,具體請參見Configuration。
您可以根據具體需求配置參數,部分特殊參數設置如下。
參數 | 說明 |
state.backend.gemini.memory.managed | 默認值為true,表示將自動根據Managed Memory以及Task Slot數計算每個Backend的內存。取值如下:
|
state.backend.gemini.offheap.size | 默認值為2 GB,當state.backend.gemini.memory.managed為false時,設置每個Backend的內存。 |
state.backend.gemini.local.dir | 表示GeminiDB本地數據文件的存放目錄。 |
state.backend.gemini.timer-service.factory | 默認值為HEAP,表示timer-service state的存儲位置。取值如下:
|
前提條件
已創建Hadoop集群。
已創建項目。
已獲取作業所需的資源,以及作業需要處理的數據文件,例如,JAR包、數據文件名稱及其保存路徑。
說明建議使用OSS維護要運行的JAR包。
如果使用本地路徑,請使用文件的絕對路徑。
操作步驟
- 進入數據開發的項目列表頁面。
- 通過阿里云賬號登錄阿里云E-MapReduce控制臺。
- 在頂部菜單欄處,根據實際情況選擇地域和資源組。
- 單擊上方的數據開發頁簽。
- 單擊待編輯項目所在行的作業編輯。
新建Flink類型作業。
- 在頁面左側,在需要操作的文件夾上單擊右鍵,選擇新建作業。
在新建作業對話框中,輸入作業名稱和作業描述,從作業類型下拉列表中選擇Flink作業類型。
- 單擊確定。
編輯作業內容。
在作業內容中,填寫提交該作業需要提供的命令行參數。
Flink類型作業支持JAR包形式的Flink Datastream、Table和SQL作業,示例如下。
run -m yarn-cluster -yjm 1024 -ytm 2048 ossref://path/to/oss/of/WordCount.jar --input oss://path/to/oss/to/data --output oss://path/to/oss/to/result
EMR-3.x版本自EMR-3.28.2版本開始,Flink類型作業同時支持PyFlink作業,示例如下。
run -m yarn-cluster -yjm 1024 -ytm 2048 -py ossref://path/to/oss/of/word_count.py
PyFlink作業其它可用參數,請參見Apache Flink官方文檔。
單擊保存。
說明您可以根據集群的版本來訪問Flink的Web UI:
EMR-3.29.0之前版本
僅支持通過SSH隧道方式訪問Web UI。
EMR-3.29.0及后續版本
(推薦)您可以通過EMR控制臺的方式訪問Web UI。
您可以通過SSH隧道方式訪問Web UI。