日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Kafka數(shù)據(jù)遷移MaxCompute最佳實(shí)踐

本文為您介紹如何使用DataWorks數(shù)據(jù)集成,將Kafka集群上的數(shù)據(jù)遷移至MaxCompute。

前提條件

  • 開通MaxCompute和DataWorks

  • 新增MaxCompute數(shù)據(jù)源。詳情請參見創(chuàng)建MaxCompute數(shù)據(jù)源

  • 在DataWorks上完成創(chuàng)建業(yè)務(wù)流程,本例使用DataWorks簡單模式。詳情請參見創(chuàng)建業(yè)務(wù)流程

  • 搭建Kafka集群

    進(jìn)行數(shù)據(jù)遷移前,您需要保證自己的Kafka集群環(huán)境正常。本文使用阿里云EMR服務(wù)自動(dòng)化搭建Kafka集群,詳細(xì)過程請參見Kafka快速入門

    本文使用的EMR Kafka版本信息如下:

    • EMR版本:EMR-3.12.1

    • 集群類型:Kafka

    • 軟件信息:Ganglia 3.7.2,ZooKeeper 3.4.12,Kafka 2.11-1.0.1,Kafka-Manager 1.3.X.XX

    Kafka集群使用專有網(wǎng)絡(luò),區(qū)域?yàn)槿A東1(杭州),主實(shí)例組ECS計(jì)算資源配置公網(wǎng)及內(nèi)網(wǎng)IP。

背景信息

Kafka是一款分布式發(fā)布與訂閱的消息中間件,具有高性能、高吞量的特點(diǎn)被廣泛使用,每秒能處理上百萬的消息。Kafka適用于流式數(shù)據(jù)處理,主要應(yīng)用于用戶行為跟蹤、日志收集等場景。

一個(gè)典型的Kafka集群包含若干個(gè)生產(chǎn)者(Producer)、Broker、消費(fèi)者(Consumer)以及一個(gè)Zookeeper集群。Kafka集群通過Zookeeper管理自身集群的配置并進(jìn)行服務(wù)協(xié)同。

Topic是Kafka集群上最常用的消息的集合,是一個(gè)消息存儲(chǔ)邏輯概念。物理磁盤不存儲(chǔ)Topic,而是將Topic中具體的消息按分區(qū)(Partition)存儲(chǔ)在集群中各個(gè)節(jié)點(diǎn)的磁盤上。每個(gè)Topic可以有多個(gè)生產(chǎn)者向它發(fā)送消息,也可以有多個(gè)消費(fèi)者向它拉取(消費(fèi))消息。

每個(gè)消息被添加到分區(qū)時(shí),會(huì)分配一個(gè)Offset(偏移量,從0開始編號),是消息在一個(gè)分區(qū)中的唯一編號。

步驟一:準(zhǔn)備Kafka數(shù)據(jù)

您需要在Kafka集群創(chuàng)建測試數(shù)據(jù)。為保證您可以順利登錄EMR集群Header主機(jī),以及保證MaxCompute和DataWorks可以順利和EMR集群Header主機(jī)通信,請您首先配置EMR集群Header主機(jī)安全組,放行TCP 22及TCP 9092端口。

  1. 登錄EMR集群Header主機(jī)地址。

    1. 進(jìn)入EMR Hadoop控制臺(tái)。

    2. 在頂部導(dǎo)航欄,單擊集群管理

    3. 在顯示的頁面,找到您需要?jiǎng)?chuàng)建測試數(shù)據(jù)的集群,進(jìn)入集群詳情頁。

    4. 在集群詳情頁面,單擊主機(jī)列表,確認(rèn)EMR集群Header主機(jī)地址,并通過SSH連接遠(yuǎn)程登錄。

  2. 創(chuàng)建測試Topic。

    執(zhí)行如下命令創(chuàng)建測試所使用的Topic testkafka。

    kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
  3. 寫入測試數(shù)據(jù)。

    執(zhí)行如下命令,可以模擬生產(chǎn)者向Topic testkafka中寫入數(shù)據(jù)。由于Kafka用于處理流式數(shù)據(jù),您可以持續(xù)不斷地向其中寫入數(shù)據(jù)。為保證測試結(jié)果,建議寫入10條以上的數(shù)據(jù)。

    kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka

    您可以同時(shí)再打開一個(gè)SSH窗口,執(zhí)行如下命令,模擬消費(fèi)者驗(yàn)證數(shù)據(jù)是否已成功寫入Kafka。當(dāng)數(shù)據(jù)寫入成功時(shí),您可以看到已寫入的數(shù)據(jù)。

    kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning

步驟二:在DataWorks上創(chuàng)建目標(biāo)表

在DataWorks上創(chuàng)建目標(biāo)表用以接收Kafka數(shù)據(jù)。

  1. 進(jìn)入數(shù)據(jù)開發(fā)頁面。

    1. 登錄DataWorks控制臺(tái)

    2. 單擊左側(cè)導(dǎo)航欄數(shù)據(jù)建模與開發(fā) > 數(shù)據(jù)開發(fā)

    3. 在下拉框中選擇對應(yīng)工作空間后單擊進(jìn)入數(shù)據(jù)開發(fā)

  2. 右鍵單擊業(yè)務(wù)流程,選擇新建 > MaxCompute >

  3. 在彈出的新建表對話框中,填寫表名稱,并單擊新建

    說明
    • 表名必須以字母開頭,不能包含中文或特殊字符。

    • 如果在數(shù)據(jù)開發(fā)中綁定多個(gè)MaxCompute數(shù)據(jù)源,則按需選擇MaxCompute引擎實(shí)例

  4. 在表的編輯頁面,單擊DDL模式

  5. DDL對話框中,輸入如下建表語句,單擊生成表結(jié)構(gòu)

    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;

    其中的每一列,對應(yīng)于DataWorks數(shù)據(jù)集成Kafka Reader的默認(rèn)列:

    • __key__表示消息的key。

    • __value__表示消息的完整內(nèi)容 。

    • __partition__表示當(dāng)前消息所在分區(qū)。

    • __headers__表示當(dāng)前消息headers信息。

    • __offset__表示當(dāng)前消息的偏移量。

    • __timestamp__表示當(dāng)前消息的時(shí)間戳。

    您還可以自主命名,詳情參見Kafka Reader

  6. 單擊提交到生產(chǎn)環(huán)境確認(rèn)

步驟三:同步數(shù)據(jù)

  1. 新建獨(dú)享數(shù)據(jù)集成資源組。

    由于當(dāng)前DataWorks的公共資源組無法完美支持Kafka插件,您需要使用獨(dú)享數(shù)據(jù)集成資源組完成數(shù)據(jù)同步。詳情請參見新增和使用獨(dú)享數(shù)據(jù)集成資源組

  2. 新建數(shù)據(jù)集成節(jié)點(diǎn)。

    1. 進(jìn)入數(shù)據(jù)開發(fā)頁面,右鍵單擊指定業(yè)務(wù)流程,選擇新建節(jié)點(diǎn) > 數(shù)據(jù)集成 > 離線同步

    2. 新建節(jié)點(diǎn)對話框中,輸入節(jié)點(diǎn)名稱,并單擊確認(rèn)

  3. 在頂部菜單欄上,單擊轉(zhuǎn)化腳本圖標(biāo)。

  4. 在腳本模式下,單擊頂部菜單欄上的**圖標(biāo)。

  5. 配置腳本,示例代碼如下。

    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_source",// MaxCompute數(shù)據(jù)源名稱
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1
            }
        }
    }

    您可以通過在Header主機(jī)上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list命令查看group.id參數(shù),及消費(fèi)者的Group名稱。

    • 命令示例

      kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    • 返回結(jié)果

      _emr-client-metrics-handler-group
      console-consumer-69493
      console-consumer-83505
      console-consumer-21030
      console-consumer-45322
      console-consumer-14773

    console-consumer-83505為例,您可以根據(jù)該參數(shù)在Header主機(jī)上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505命令確認(rèn)beginOffsetendOffset參數(shù)。

    • 命令示例。

      kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    • 返回結(jié)果

      TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
      testkafka                      6          0               0               0          -                                                 -                              -
      test                           6          3               3               0          -                                                 -                              -
      testkafka                      0          0               0               0          -                                                 -                              -
      testkafka                      1          1               1               0          -                                                 -                              -
      testkafka                      5          0               0               0          -                                                 -                              -
  6. 配置調(diào)度資源組。

    1. 在節(jié)點(diǎn)編輯頁面的右側(cè)導(dǎo)航欄,單擊調(diào)度配置

    2. 資源屬性區(qū)域,選擇調(diào)度資源組為您創(chuàng)建的獨(dú)享數(shù)據(jù)集成資源組。

      說明

      如果您需要將Kafka的數(shù)據(jù)周期性(例如每小時(shí))寫入MaxCompute,您可以使用beginDateTimeendDateTime參數(shù),設(shè)置數(shù)據(jù)讀取的時(shí)間區(qū)間為1小時(shí),然后每小時(shí)調(diào)度一次數(shù)據(jù)集成任務(wù)。詳情請參見Kafka Reader

  7. 單擊**圖標(biāo)運(yùn)行代碼。

  8. 您可以在運(yùn)行日志查看運(yùn)行結(jié)果。

后續(xù)步驟

您可以新建一個(gè)數(shù)據(jù)開發(fā)任務(wù)運(yùn)行SQL語句,查看當(dāng)前表中是否已存在從云消息隊(duì)列 Kafka 版同步過來的數(shù)據(jù)。本文以select * from testkafka為例,具體步驟如下:

  1. 進(jìn)入數(shù)據(jù)開發(fā)頁面。

    1. 登錄DataWorks控制臺(tái)

    2. 在左側(cè)導(dǎo)航欄,單擊工作空間列表

    3. 在目標(biāo)工作空間的操作列中,單擊快速進(jìn)入,選擇數(shù)據(jù)開發(fā)

  2. 臨時(shí)查詢面板,右鍵單擊臨時(shí)查詢,選擇新建節(jié)點(diǎn) > ODPS SQL

  3. 新建節(jié)點(diǎn)對話框中,輸入名稱

  4. 單擊確認(rèn)

  5. 在創(chuàng)建的節(jié)點(diǎn)頁面,輸入select * from testkafka,單擊image圖標(biāo),運(yùn)行完成后,查看運(yùn)行日志。

    image