本文說明如何創建MaxCompute Sink Connector將數據從云消息隊列 Kafka 版實例的數據源Topic導出至MaxCompute的表。

前提條件

在創建MaxCompute Sink Connector前,請確保您已完成以下操作:
  • 云消息隊列 Kafka 版
    • 云消息隊列 Kafka 版實例開啟Connector。更多信息,請參見開啟Connector
    • 云消息隊列 Kafka 版實例創建數據源Topic。更多信息,請參見步驟一:創建Topic

      本文以名稱為maxcompute-test-input的Topic為例。

  • 大數據計算服務(MaxCompute)
    • 通過MaxCompute客戶端創建表。更多信息,請參見創建表

      本文以名稱為connector_test的項目下名稱為test_kafka的表為例。該表的建表語句如下:

      CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);
  • 可選:事件總線EventBridge
    說明 僅在您創建的Connector任務所屬實例的地域為華東1(杭州)或西南1(成都)時,需要完成該操作。

注意事項

  • 僅支持在同地域內,將數據從云消息隊列 Kafka 版實例的數據源Topic導出至MaxCompute。Connector的限制說明,請參見使用限制
  • 如果Connector所屬實例的地域為華東1(杭州)或西南1(成都),該功能會部署至事件總線EventBridge
    • 事件總線EventBridge目前免費供您使用。更多信息,請參見計費說明
    • 創建Connector時,事件總線EventBridge會為您自動創建服務關聯角色AliyunServiceRoleForEventBridgeSourceKafkaAliyunServiceRoleForEventBridgeConnectVPC
      • 如果未創建服務關聯角色,事件總線EventBridge會為您自動創建對應的服務關聯角色,以便允許事件總線EventBridge使用此角色訪問云消息隊列 Kafka 版和專有網絡VPC。
      • 如果已創建服務關聯角色,事件總線EventBridge不會重復創建。
      關于服務關聯角色的更多信息,請參見服務關聯角色
    • 部署到事件總線EventBridge的任務暫時不支持查看任務運行日志。Connector任務執行完成后,您可以在訂閱數據源Topic的Group中,通過消費情況查看任務進度。具體操作,請參見查看消費狀態

操作流程

使用MaxCompute Sink Connector將數據從云消息隊列 Kafka 版實例的數據源Topic導出至MaxCompute的表操作流程如下:

  1. 授予云消息隊列 Kafka 版訪問MaxCompute的權限。
  2. 可選:創建MaxCompute Sink Connector依賴的Topic和Group

    如果您不需要自定義Topic和Group,您可以直接跳過該步驟,在下一步驟選擇自動創建。

    重要 部分MaxCompute Sink Connector依賴的Topic的存儲引擎必須為Local存儲,大版本為0.10.2的云消息隊列 Kafka 版實例不支持手動創建Local存儲的Topic,只支持自動創建。
    1. 創建MaxCompute Sink Connector依賴的Topic
    2. 創建MaxCompute Sink Connector依賴的Group
  3. 創建并部署MaxCompute Sink Connector
  4. 結果驗證
    1. 發送測試消息
    2. 查看表數據

創建RAM角色

由于RAM角色不支持直接選擇云消息隊列 Kafka 版作為受信服務,您在創建RAM角色時,需要選擇任意支持的服務作為受信服務。RAM角色創建后,手動修改信任策略。

  1. 登錄訪問控制控制臺
  2. 在左側導航欄,選擇身份管理 > 角色
  3. 角色頁面,單擊創建角色
  4. 創建角色面板,執行以下操作。
    1. 選擇可信實體類型為阿里云服務,然后單擊下一步
    2. 角色類型區域,選擇普通服務角色,在角色名稱文本框,輸入AliyunKafkaMaxComputeUser1,從選擇受信服務列表,選擇大數據計算服務,然后單擊完成
  5. 角色頁面,找到AliyunKafkaMaxComputeUser1,單擊AliyunKafkaMaxComputeUser1
  6. AliyunKafkaMaxComputeUser1頁面,單擊信任策略管理頁簽,單擊修改信任策略
  7. 修改信任策略面板,將腳本中odps替換為alikafka,單擊確定

    替換后的策略如下所示。

    pg_ram

添加權限

為使Connector將消息同步到MaxCompute表,您需要為創建的RAM角色至少授予以下權限:

客體操作描述
ProjectCreateInstance在項目中創建實例。
TableDescribe讀取表的元信息。
TableAlter修改表的元信息或添加刪除分區。
TableUpdate覆蓋或添加表的數據。

關于以上權限的詳細說明以及授權操作,請參見MaxCompute權限

為本文創建的AliyunKafkaMaxComputeUser1添加權限的示例步驟如下:

  1. 登錄MaxCompute客戶端。
  2. 執行以下命令添加RAM角色為用戶。
    add user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
    說明 將<accountid>替換為您自己的阿里云賬號ID。
  3. 為RAM用戶授予訪問MaxCompute所需的最小權限。
    1. 執行以下命令為RAM用戶授予項目相關權限。
      grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      說明 將<accountid>替換為您自己的阿里云賬號ID。
    2. 執行以下命令為RAM用戶授予表相關權限。
      grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      說明 將<accountid>替換為您自己的阿里云賬號ID。

創建MaxCompute Sink Connector依賴的Topic

您可以在云消息隊列 Kafka 版控制臺手動創建MaxCompute Sink Connector依賴的5個Topic,包括:任務位點Topic、任務配置Topic、任務狀態Topic、死信隊列Topic以及異常數據Topic。每個Topic所需要滿足的分區數與存儲引擎會有差異,具體信息,請參見配置源服務參數列表

  1. 登錄云消息隊列 Kafka 版控制臺
  2. 概覽頁面的資源分布區域,選擇地域。
    重要 Topic需要在應用程序所在的地域(即所部署的ECS的所在地域)進行創建。Topic不能跨地域使用。例如Topic創建在華北2(北京)這個地域,那么消息生產端和消費端也必須運行在華北2(北京)的ECS。
  3. 實例列表頁面,單擊目標實例名稱。
  4. 在左側導航欄,單擊Topic 管理
  5. Topic 管理頁面,單擊創建 Topic
  6. 創建 Topic面板,設置Topic屬性,然后單擊確定
    創建Topic
    參數說明示例
    名稱Topic名稱。demo
    描述Topic的簡單描述。demo test
    分區數Topic的分區數量。12
    存儲引擎
    說明 當前僅專業版實例支持選擇存儲引擎類型,標準版暫不支持,默認選擇為云存儲類型。
    Topic消息的存儲引擎。

    云消息隊列 Kafka 版支持以下兩種存儲引擎。

    • 云存儲:底層接入阿里云云盤,具有低時延、高性能、持久性、高可靠等特點,采用分布式3副本機制。實例的規格類型標準版(高寫版)時,存儲引擎只能為云存儲
    • Local 存儲:使用原生Kafka的ISR復制算法,采用分布式3副本機制。
    云存儲
    消息類型Topic消息的類型。
    • 普通消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,可能會造成消息亂序。當存儲引擎選擇云存儲時,默認選擇普通消息
    • 分區順序消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,仍然保證分區內按照發送順序存儲。但是會出現部分分區發送消息失敗,等到分區恢復后即可恢復正常。當存儲引擎選擇Local 存儲時,默認選擇分區順序消息
    普通消息
    日志清理策略Topic日志的清理策略。

    存儲引擎選擇Local 存儲(當前僅專業版實例支持選擇存儲引擎類型為Local存儲,標準版暫不支持)時,需要配置日志清理策略

    云消息隊列 Kafka 版支持以下兩種日志清理策略。

    • Delete:默認的消息清理策略。在磁盤容量充足的情況下,保留在最長保留時間范圍內的消息;在磁盤容量不足時(一般磁盤使用率超過85%視為不足),將提前刪除舊消息,以保證服務可用性。
    • Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保證相同Key的消息,最新的value值一定會被保留。主要適用于系統宕機后恢復狀態,系統重啟后重新加載緩存等場景。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic存儲系統狀態信息或配置信息。
      重要 Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的消息收發請勿為Topic設置該屬性。具體信息,請參見云消息隊列 Kafka 版Demo庫
    Compact
    標簽Topic的標簽。demo
    創建完成后,在Topic 管理頁面的列表中顯示已創建的Topic。

創建MaxCompute Sink Connector依賴的Group

您可以在云消息隊列 Kafka 版控制臺手動創建MaxCompute Sink Connector數據同步任務使用的Group。該Group的名稱必須為connect-任務名稱,具體信息,請參見配置源服務參數列表

  1. 登錄云消息隊列 Kafka 版控制臺
  2. 概覽頁面的資源分布區域,選擇地域。
  3. 實例列表頁面,單擊目標實例名稱。
  4. 在左側導航欄,單擊Group 管理
  5. Group 管理頁面,單擊創建 Group
  6. 創建 Group面板的Group ID文本框輸入Group的名稱,在描述文本框簡要描述Group,并給Group添加標簽,單擊確定
    創建完成后,在Group 管理頁面的列表中顯示已創建的Group。

創建并部署MaxCompute Sink Connector

創建并部署用于將數據從云消息隊列 Kafka 版同步至MaxCompute的MaxCompute Sink Connector。

  1. 登錄云消息隊列 Kafka 版控制臺
  2. 概覽頁面的資源分布區域,選擇地域。
  3. 在左側導航欄,單擊Connector 任務列表
  4. Connector 任務列表頁面,從選擇實例的下拉列表選擇Connector所屬的實例,然后單擊創建 Connector
  5. 創建 Connector配置向導面頁面,完成以下操作。
    1. 配置基本信息頁簽,按需配置以下參數,然后單擊下一步
      參數描述示例值
      名稱Connector的名稱。命名規則:
      • 可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字符。
      • 同一個云消息隊列 Kafka 版實例內保持唯一。

      Connector的數據同步任務必須使用名稱為connect-任務名稱Group。如果您未手動創建該Group,系統將為您自動創建。

      kafka-maxcompute-sink
      實例默認配置為實例的名稱與實例ID。demo alikafka_post-cn-st21p8vj****
    2. 配置源服務頁簽,選擇數據源消息隊列Kafka版,并配置以下參數,然后單擊下一步
      說明 如果您已創建好Topic和Group,那么請選擇手動創建資源,并填寫已創建的資源信息。否則,請選擇自動創建資源。
      表 1. 配置源服務參數列表
      參數描述示例值
      數據源 Topic需要同步數據的Topic。maxcompute-test-input
      消費線程并發數數據源Topic的消費線程并發數。默認值為6。取值說明如下:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      消費初始位置開始消費的位置。取值說明如下:
      • 最早位點:從最初位點開始消費。
      • 最近位點:從最新位點開始消費。
      最早位點
      VPC ID數據同步任務所在的VPC。單擊配置運行環境顯示該參數。默認為云消息隊列 Kafka 版實例所在的VPC,您無需填寫。vpc-bp1xpdnd3l***
      vSwitch ID數據同步任務所在的交換機。單擊配置運行環境顯示該參數。該交換機必須與云消息隊列 Kafka 版實例處于同一VPC。默認為部署云消息隊列 Kafka 版實例時填寫的交換機。vsw-bp1d2jgg81***
      失敗處理消息發送失敗后,是否繼續訂閱出現錯誤的Topic的分區。單擊配置運行環境顯示該參數。取值說明如下。
      • 繼續訂閱:繼續訂閱出現錯誤的Topic的分區,并打印錯誤日志。
      • 停止訂閱:停止訂閱出現錯誤的Topic的分區,并打印錯誤日志
      說明
      繼續訂閱
      創建資源方式選擇創建Connector所依賴的Topic與Group的方式。單擊配置運行環境顯示該參數。
      • 自動創建
      • 手動創建
      自動創建
      Connector 消費組Connector的數據同步任務使用的Group。單擊配置運行環境顯示該參數。該Groupp的名稱必須為connect-任務名稱connect-kafka-maxcompute-sink
      任務位點 Topic用于存儲消費位點的Topic。單擊配置運行環境顯示該參數。
      • Topic名稱:建議以connect-offset開頭。
      • 分區數:Topic的分區數量必須大于1。
      • 存儲引擎:Topic的存儲引擎必須為Local存儲。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-offset-kafka-maxcompute-sink
      任務配置 Topic用于存儲任務配置的Topic。單擊配置運行環境顯示該參數。
      • Topic名稱:建議以connect-config開頭。
      • 分區數:Topic的分區數量必須為1。
      • 存儲引擎:Topic的存儲引擎必須為Local存儲。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-config-kafka-maxcompute-sink
      任務狀態 Topic用于存儲任務狀態的Topic。單擊配置運行環境顯示該參數。
      • Topic名稱:建議以connect-status開頭。
      • 分區數:Topic的分區數量建議為6。
      • 存儲引擎:Topic的存儲引擎必須為Local存儲。
      • cleanup.policy:Topic的日志清理策略必須為compact。
      connect-status-kafka-maxcompute-sink
      死信隊列 Topic用于存儲Connect框架的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和異常數據Topic為同一個Topic,以節省Topic資源。
      • Topic名稱:建議以connect-error開頭。
      • 分區數:Topic的分區數量建議為6。
      • 存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
      connect-error-kafka-maxcompute-sink
      異常數據 Topic用于存儲Sink的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和死信隊列Topic為同一個Topic,以節省Topic資源。
      • Topic名稱:建議以connect-error開頭。
      • 分區數:Topic的分區數量建議為6。
      • 存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
      connect-error-kafka-maxcompute-sink
    3. 配置目標服務頁簽,選擇目標服務大數據計算服務,并配置以下參數,然后單擊創建
      說明 如果Connector所屬實例的地域為華東1(杭州)或西南1(成都),選擇目標服務大數據計算服務時, 會分別彈出創建服務關聯角色AliyunServiceRoleForEventBridgeSourceKafkaAliyunServiceRoleForEventBridgeConnectVPC服務授權對話框,在彈出的服務授權對話框中單擊確認,然后再配置以下參數并單擊創建。如果服務關聯角色已創建,則不再重復創建,即不會再彈出服務授權對話框。
      參數描述示例值
      連接地址MaxCompute的服務接入點。更多信息,請參見Endpoint
      • VPC網絡Endpoint:低延遲,推薦。適用于云消息隊列 Kafka 版實例和MaxCompute處于同一地域場景。
      • 外網Endpoint:高延遲,不推薦。適用于云消息隊列 Kafka 版實例和MaxCompute處于不同地域的場景。如需使用公網Endpoint,您需要為Connector開啟公網訪問。更多信息,請參見為Connector開啟公網訪問
      http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
      工作空間MaxCompute的工作空間。connector_test
      MaxCompute的表。test_kafka
      表地域MaxCompute表所在地域。華東1(杭州)
      服務賬號MaxCompute的阿里云賬號ID。188***
      授權角色名云消息隊列 Kafka 版的RAM角色的名稱。更多信息,請參見創建RAM角色AliyunKafkaMaxComputeUser1
      模式消息同步到Connector的模式。默認為DEFAULT。取值說明如下:
      • KEY:只保留消息的Key,并將Key寫入MaxCompute表的key列。
      • VALUE:只保留消息的Value,并將Value寫入MaxCompute表的value列。
      • DEFAULT:同時保留消息的Key和Value,并將Key和Value分別寫入MaxCompute表的key列和value列。
        重要 DEFAULT模式下,不支持選擇CSV格式,只支持TEXT格式和BINARY格式。
      DEFAULT
      格式消息同步到Connector的格式。默認為TEXT。取值說明如下:
      • TEXT:消息的格式為字符串。
      • BINARY:消息的格式為字節數組。
      • CSV:消息的格式為逗號(,)分隔的字符串。
        重要 CSV格式下,不支持DEFAULT模式,只支持KEY模式和VALUE模式:
        • KEY模式:只保留消息的Key,根據逗號(,)分隔Key字符串,并將分隔后的字符串按照索引順序寫入表。
        • VALUE模式:只保留消息的Value,根據逗號(,)分隔Value字符串,并將分隔后的字符串按照索引順序寫入表。
      TEXT
      分區分區的粒度。默認為HOUR。取值說明如下:
      • DAY:每天將數據寫入一個新分區。
      • HOUR:每小時將數據寫入一個新分區。
      • MINUTE:每分鐘將數據寫入一個新分區。
      HOUR
      時區向Connector的數據源Topic發送消息的云消息隊列 Kafka 版生產者客戶端所在時區。默認為GMT 08:00。GMT 08:00
      創建完成后,在Connector 任務列表頁面,查看創建的Connector 。
  6. 創建完成后,在Connector 任務列表頁面,找到創建的Connector ,單擊其操作列的部署

發送測試消息

部署MaxCompute Sink Connector后,您可以向云消息隊列 Kafka 版的數據源Topic發送消息,測試數據能否被同步至MaxCompute。

  1. Connector 任務列表頁面,找到目標Connector,在其右側操作列,單擊測試
  2. 發送消息面板,發送測試消息。
    • 發送方式選擇控制臺
      1. 消息 Key文本框中輸入消息的Key值,例如demo。
      2. 消息內容文本框輸入測試的消息內容,例如 {"key": "test"}。
      3. 設置發送到指定分區,選擇是否指定分區。
        • 單擊,在分區 ID文本框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態
        • 單擊,不指定分區。
    • 發送方式選擇Docker,執行運行 Docker 容器生產示例消息區域的Docker命令,發送消息。
    • 發送方式選擇SDK,根據您的業務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK發送消息。

查看表數據

云消息隊列 Kafka 版的數據源Topic發送消息后,在MaxCompute客戶端查看表數據,驗證是否收到消息。

查看本文寫入的test_kafka的示例步驟如下:

  1. 登錄MaxCompute客戶端。
  2. 執行以下命令查看表的數據分區。
    show partitions test_kafka;
    返回結果示例如下:
    pt=11-17-2020 15
    
    OK
  3. 執行以下命令查看分區的數據。
    select * from test_kafka where pt ="11-17-2020 14";
    返回結果示例如下:
    +----------------------+------------+------------+-----+-------+---------------+
    | topic                | partition  | offset     | key | value | pt            |
    +----------------------+------------+------------+-----+-------+---------------+
    | maxcompute-test-input| 0          | 0          | 1   | 1     | 11-17-2020 14 |
    +----------------------+------------+------------+-----+-------+---------------+