同步數據到MaxCompute
準備工作
1.創建MaxCompute表
DataHub支持將數據同步到MaxCompute對應的數據表中,同時支持分區表和非分區表,一般情況下推薦用戶使用分區表進行數據同步以方便MaxCompute數據處理。
目前DataHub支持將TUPLE和BLOB的數據同步到MaxCompute數據表中。
1)針對TUPLE類型topic,MaxCompute目標表數據類型需要和DataHub數據類型相匹配,具體的數據類型映射關系如下:
MaxCompute | DataHub |
BIGINT | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
TINYINT | TINIINT |
SMALLINT | SMALLINT |
INT | INTEGER |
FLOAT | FLOAT |
MAP | 不支持 |
ARRAY | 不支持 |
由于目前DataHub并不能完全支持MaxCompute所有的數據類型,請用戶盡量根據DataHub數據類型創建MaxCompute表結構。
2)針對BLOB數據類型,需要要求MaxCompute表結構僅需要包含一列STRING類型的column即可,DataHub默認會將數據同步到該column中。
DataHub | MaxCompute |
BLOB | STRING |
3)同時為了方便數據追蹤和問題排查,建議用戶在創建MaxCompute表結構時,增加一列__rowkey__ STRING
字段,DataHub會自動將DataHub對應數據的trace信息同步到該列中,以方便后續數據排查。
2.準備同步任務賬號并授權
1)新建同步MaxCompute任務時,需要用戶手動填寫訪問MaxCompute表的賬號信息,請用戶確保填入有效的賬號信息(一般情況下采用MaxCompute子賬號即可)。
2)需要給該賬號授予訪問MaxCompute表的響應權限,具體權限包括CreateInstance
、Describe
、Alter
以及Update
權限。
用戶可以使用DataWorks管控臺進行MaxCompute對應表的權限管理,參考MaxCompute高級配置,也可以選擇使用MaxCompute的命令行工具進行授權,參考MaxCompute使用及授權管理。
3.確認TimestampUnit單位
1) Connector中TimestampUnit的作用,就是將數據中TIMESTAMP類型的數據(如果有),以TimestampUnit為單位進行轉換后寫入到下游系統的日期類型(如datetime類型)。
2)如果TIMESTAMP列寫入的是以秒為單位的值,那新建Connector的時候TimestampUnit就選擇“SECOND”;如果寫入的是以毫秒為單位的值,那就選擇“MILLISECOND”;如果寫入的是以微秒為單位的值,那就選擇“MICROSECOND”
注意事項
由于MaxCompute目前的寫入標準原因,分區數越多就會導致DataHub同步數據越慢。因此,在創建MaxCompute同步任務時,請盡可能的控制分區數,尤其是
USER_DEFINE
同步模式。同一分區的數據越連續越好,不要頻繁的分區跳變。
同步模式控制創建分區時,請不要創建過多的分區數。
創建同步任務
依次進入
項目列表/Project詳情/Topic詳情
頁面。點擊右上角的
+ 同步
按鈕進行同步任務創建。
選擇MaxCompute類型作業,如下圖所示:
1)TUPLE類型同步2)BLOB類型同步
部分配置說明:
下面羅列了部分管控臺創建同步任務的配置說明,更多更靈活的操作請參考SDK使用。
導入字段
DataHub可以根據用戶設置將部分column內容同步到MaxCompute表中。
分區模式
分區模式決定了將數據寫入到MaxCompute哪個分區中,目前DataHub支持以下分區方式:
分區模式 | 分區依據 | 支持Topic類型 | 說明 |
USER_DEFINE | Record中的分區列(和MaxCompute的分區字段同名)的value值 | TUPLE | (1). DataHub schema中必須包含MaxCompute分區字段 (2). 該列值必須為 |
SYSTEM_TIME | Record寫入DataHub的時間 | TUPLE / BLOB | (1). 分區配置中設置MaxCompute分區的時間轉換Format格式 (2). 設置時區信息 |
EVENT_TIME | Record中的 | TUPLE | (1). 分區配置中設置MaxCompute分區的時間轉換Format格式 (2). 設置時區信息 |
META_TIME | Record的屬性字段 | TUPLE / BLOB | (1). 分區配置中設置MaxCompute分區的時間轉換Format格式 (2). 設置時區信息 |
其中SYSTEM_TIME
、EVENT_TIME
和META_TIME
均是根據時間Timestamp和時區配置來進行MaxCompute分區的轉換過程,單位默認為微秒。
分區配置決定了根據時間戳轉換MaxCompute分區時的相關配置。目前管控臺默認固定的MaxCompute分區格式,分區配置對應為
分區 | 時間Format | 說明 |
ds | %Y%m%d | day |
hh | %H | hour |
mm | %M | minute |
分區間隔決定了根據時間戳轉換MaxCompute分區時所采用的時間間隔。時間范圍是
15分鐘 ~ 1440分鐘(1天)
,跳變間隔15分鐘
。時區信息(TimeZone)時區信息決定了根據時間戳轉換MaxCompute分區時所采用的轉換時區。
分隔符BLOB數據同步時,可以指定16進制分隔符來決定是否對BLOB數據分割后再同步MaxCompute,比如
0A
表示\n
(換行符)。Base64編碼DataHub BLOB默認存儲二進制數據,而MaxCompute對應的同步列為STRING類型,因此管控臺創建同步任務時,默認采用base64編碼后進行同步,更多定制化需求請參考SDK實現。
查看同步任務
可以點擊對應connector的詳情頁面查看同步任務的運行狀態和點位等信息, 包含同步點位、同步狀態以及重啟和停止等操作,如下圖所示:
同步示例
1. USER_DEFINE同步模式
建立DataHub Topic
備注: topic schema中必須需要包含MaxCompute分區字段,類型為STRING,如下圖所示:
向DataHub Topic寫入數據,可以使用datahub-sdk進行數據寫入
測試過程中使用SDK寫入幾條條數據,其中[ds,hh,mm]分別為:[20210304,01,15]和[20210304,02,15],數據內容如下所示:
3 建立同步任務
USER_DEFINE分區模式可以通過在同步中設置分區配置字段,如果MaxCompute沒有對應的表,可自動創建。 這里導入字段中設置導入f1、f2字段,不同步f3字段。
4 確認同步數據
可以從DataHub管控臺查看對應同步任務的同步信息。 查詢MaxCompute數據結果,結果如下: 可以看到在USER_DEFINE模式下,DataHub會根據MaxCompute分組字段所對應的value
將DataHub中的數據同步到對應的分區中。
2. SYSTEM_TIME同步模式
建立DataHub Topic
備注:由于分區是根據寫入DataHub時間來計算的,因此topic schema只需包含數據字段,不需要包含分區字段,如下圖所示:
向DataHub Topic寫入數據,可以使用datahub-sdk進行數據寫入。
測試過程中使用SDK寫入幾條數據,DataHub目前對應的寫入時間為
2021-03-04 14:02:45
,數據內容如下所示:建立同步任務
請注意分區配置需要和MaxCompute表分區一致。
4 確認同步數據
可以從DataHub管控臺查看對應同步任務的同步信息,如DoneTime。 查詢MaxCompute數據結果,結果如下: 可以看到在SYSTEM_TIME模式下,DataHub會根據數據寫入DataHub的時間
將DataHub中的數據同步到對應的分區中。
常見問題
同步到MaxCompute timestamp字段時間變為1970-01-19
原因:DataHub同步MaxCompute默認時間戳單位為微秒,用戶寫入時間戳為毫秒解決方案:寫入DataHub時間戳以微秒方式寫入。