日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

實時消費Kafka數據

當您需要將Kafka數據寫入云原生數據倉庫AnalyticDB PostgreSQL版,且不希望使用其他數據集成工具時,可以通過實時數據消費功能直接消費Kafka數據,減少實時處理組件依賴,提升寫入吞吐。

Apache Kafka是一個容錯、低延遲、分布式的發布-訂閱消息系統。Streaming Server支持從Apache和Confluent Kafka發行版中加載Kafka數據。通過云原生數據倉庫AnalyticDB PostgreSQL版可讀外表對Kafka數據進行轉換,并將數據寫入云原生數據倉庫AnalyticDB PostgreSQL版目標表中。

前提條件

  • Kafka服務與云原生數據倉庫AnalyticDB PostgreSQL版實例需在同一專有網絡(VPC)。

    重要

    如果Kafka服務與云原生數據倉庫AnalyticDB PostgreSQL版實例屬于同一專有網絡但是不在同一交換機(vSwitch)下,您需要進行如下操作:

    • 將Kafka服務所在交換機的IPv4網段添加至云原生數據倉庫AnalyticDB PostgreSQL版實例白名單中。具體操作,請參見設置白名單。

    • 云原生數據倉庫AnalyticDB PostgreSQL版實例所在交換機的IPv4網段添加至Kafka服務白名單中。具體操作,請參見配置白名單。

  • 已在Kafka服務中生成了大量樣例數據。本文以阿里云云消息隊列Kafka版為例,具體信息如下。

    • 接入點信息:alikafka-post-cn-wwo3hflb****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-3-vpc.alikafka.aliyuncs.com:9092

    • Topic:test_topic

    • consumer group:test_consumer_group

  • 已在云原生數據倉庫AnalyticDB PostgreSQL版中創建目標用戶和目標表,同時在任務中使用的數據庫用戶需要具備以下權限。

    • 使用gpfdist協議創建只讀外表的權限。

    • 任務中配置的數據庫Schema的USAGE和CREATE權限。

    • 任務中配置的寫入目標表的SELECT和INSERT權限。

    本文以liss_test用戶和liss_test.liss_test_plaintext表為例。

    CREATE role liss_test with login;
    ALTER role liss_test with password 'lissTest****';
    ALTER role liss_test CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist'); 
    
    \c - liss_test
    CREATE DATABASE liss_test;
    \c liss_test
    CREATE SCHEMA liss_test;
    
    CREATE TABLE liss_test.liss_test_plaintext (
    column_1 varchar(32),
    column_2 bigint,
    column_3 numeric,
    column_4 varchar(32),
    column_5 varchar(32)
    ) distributed by (column_1, column_2);

使用限制

  • 云原生數據倉庫 AnalyticDB PostgreSQL 版6.0實例需為v6.6.0及以上版本。云原生數據倉庫 AnalyticDB PostgreSQL 版7.0實例需為v7.0.3及以上版本。AnalyticDB PostgreSQL版Serverless模式實例暫不支持。

  • 實時數據消費目前僅支持INSERT、MERGE(UPSERT)、UPDATE三種語法,暫不支持DELETE與READ。

  • 使用MERGE(UPSERT)或UPDATE時,需要云原生數據倉庫 AnalyticDB PostgreSQL 版表有主鍵索引。

  • 使用實時數據消費,不同分區(Partition)之間需要使用主鍵列做分區因子,否則可能會造成全局死鎖錯誤,導致部分數據更新失敗。

  • 實時數據消費當前僅支持Kafka消息隊列,暫不支持CDC格式的數據源。

  • 當前的版本向導模式支持CSV和Delimited兩種格式的數據源,專業模式支持CSV、Delimited和protobuf三種格式的數據源。

操作步驟

步驟一:開啟實時數據服務

  1. 登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺。

  2. 在控制臺左上角,選擇實例所在地域。

  3. 找到目標實例,單擊實例ID。

  4. 在控制臺左側導航欄單擊實時數據消費,再單擊左上角開啟實時數據服務

    image

  5. 在彈出的對話框中填寫名稱服務描述并單擊確定。開通完成后,可在控制臺看到服務狀態連接信息。

    image

    說明

    服務規格當前不可選,默認為8CU。

步驟二:新增實時數據源

  1. 登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺。
  2. 在控制臺左上角,選擇實例所在地域。
  3. 找到目標實例,單擊實例ID。
  4. 在左側導航欄,單擊實時數據消費。

  5. 實時數據源卡片中,單擊新增數據源,并完成以下配置。

    配置項

    描述

    關聯數據服務

    在下拉框中選擇已創建的實時數據服務。

    數據源名稱

    自定義數據源名稱。

    數據源描述

    自定義數據源描述。

    數據源類型

    目前僅支持Kafka。

    brokers

    Kafka接入點信息。

    • 阿里云的Kafka服務,可登錄阿里云控制臺獲取默認接入點。具體操作,請參見查看接入點。

    • 自建的kafka服務,Brokers需要填寫Kafka服務具體的`hostname:port``ip:port`信息。

    topic

    Kafka的Topic名稱。

    format

    當前版本向導模式支持CSV和Delimited兩種格式的數據源,專業模式支持CSV、Delimited和protobuf三種格式的數據源。

    列分隔符

    可設置任意單字符分隔符。

  6. 單擊確定

步驟三:新增實時任務

  1. 實時任務卡片中,單擊新增實時任務,并完成以下配置。

    請根據業務需要選擇向導模式專業模式。

    向導模式:可以通過控制臺中的指引來快速搭建任務。

    專業模式:可以通過提交YAML的方式向Streaming Server提交任務,功能相比于向導模式更豐富。

    向導模式

    配置項

    描述

    基本信息

    任務名稱

    定義任務的名稱,任務名稱不可以重復,必填。

    任務描述

    描述任務內容,選填。

    配置模式

    向導模式。

    源端配置

    數據源

    選擇在新增實時數據源中配置的數據源,目前僅支持Kafka為源的數據源。

    group_name

    Kafka的消費者組。

    failback_offset

    消費位點。

    • earliest:從最早可用位點消費。

    • latest:從最新的位點開始消費。

    投遞保證

    流計算中的一致性語義,支持:

    • ATLEAST:在Kafka中的數據至少有一次被寫入云原生數據倉庫AnalyticDB PostgreSQL版。

    • EXACTLY:在Kafka中的數據有且僅有一次被寫入云原生數據倉庫AnalyticDB PostgreSQL版。

    目標端配置

    目標庫

    需要寫入的云原生數據倉庫AnalyticDB PostgreSQL版目標數據庫名稱。

    Schema

    云原生數據倉庫AnalyticDB PostgreSQL版的模式名稱。

    目標表

    需要寫入的云原生數據倉庫AnalyticDB PostgreSQL版目標表名稱。

    賬號

    當前任務使用的云原生數據倉庫AnalyticDB PostgreSQL版數據庫賬號。

    密碼

    賬號密碼。

    寫入模式

    目前僅支持INSERT、UPDATE和MERGE三種寫入模式。

    • INSERT:將數據直接寫入目標表。

    • UPDATE:當輸入列中的MatchColumns與目標表中的列匹配,更新UpdateColumns中列出的目標表列。

    • MERGE:當寫入數據與目標表列的值相等時,使用寫入數據更新目標表列的現有數據。當寫入數據與目標表列的值不相等時,直接將數據寫入目標表。MERGE寫入模式可類比于UPSERT(UPDATE and INSERT),關于UPSERT的寫入方式,請參見使用INSERT ON CONFLICT覆蓋寫入數據。

    說明

    MatchColumns與UpdateColumns的含義請參見下文字段類型的描述。

    ErrorLimitCount

    錯誤數據的容忍閾值。當寫入的錯誤數據到達ErrorLimitCount時,Streaming Server會自動停止將數據源的數據寫入云原生數據倉庫AnalyticDB PostgreSQL版。0表示Streaming Server遇到第一次錯誤數據時就會停止數據寫入。目前該參數未啟用,填0即可。

    字段映射

    源字段

    Kafka消息中的Value字段名,需要按照在Value中出現的順序指定所有的字段名。

    目標字段

    云原生數據倉庫AnalyticDB PostgreSQL版目標表的字段名。

    字段類型

    目前支持以下三種類型:

    • MatchColumns:作為寫入時的Join條件列作為更新條件,用于判斷目標表中哪些行需要被更新。

    • UpdateColumns:如果某一行數據符合更新條件,那么在UpdateColumns中的列會被更新為新的值。

    • 空(不填):即使某一行數據符合更新條件,該字段也不會被更新為新的值。

    在UPDATE和MERGE寫入時,Streaming Server會先將數據寫入一個臨時表,然后利用MatchColumns作為條件列與目標表進行Join:

    • 如果有匹配的數據,則會更新UpdateColumns中的數據。

    • 如果沒有匹配的數據時,則會根據寫入模式有以下兩種情況:

      • 寫入模式為UPDATE時,數據不會被寫入。

      • 寫入模式為MERGE時,數據會被寫入。

    專業模式

    配置項

    描述

    基本信息

    任務名稱

    定義任務的名稱,任務名稱不可以重復,必填。

    任務描述

    描述任務內容,選填。

    配置模式

    專業模式。

    數據源

    選擇在新增實時數據源中配置的數據源,目前僅支持Kafka為源的數據源。

    YAML

    可以通過YAML配置更復雜的寫入邏輯。本文的YAML配置示例如下。更多詳情,請參見附錄:YAML配置說明

    DATABASE: liss_test
    USER: liss_test
    PASSWORD: lissTest****
    HOST: gp-2ze517f9l7****-master.gpdb.rds-aliyun-pre.rds.aliyuncs.com
    PORT: 5432
    KAFKA:
      INPUT:
        SOURCE:
          BROKERS: alikafka-post-cn-wwo3hflbo002-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-3-vpc.alikafka.aliyuncs.com:9092
          TOPIC: test_topic
          FALLBACK_OFFSET: EARLIEST
        VALUE:
          COLUMNS:
          - NAME: column_1
            TYPE: varchar(32)
          - NAME: column_2
            TYPE: bigint
          - NAME: column_3
            TYPE: numeric
          - NAME: column_4
            TYPE: varchar(32)
          - NAME: column_5
            TYPE: varchar(32)
          FORMAT: delimited
          DELIMITED_OPTION:
            DELIMITER: "|"
        ERROR_LIMIT: 20
      OUTPUT:
        SCHEMA: liss_test
        TABLE: liss_test_plaintext
        MODE: MERGE
        MATCH_COLUMNS:
        - column_1
        - column_2
        UPDATE_COLUMNS:
        - column_3
        - column_4
        - column_5
        MAPPING:
        - NAME: column_1
          EXPRESSION: column_1
        - NAME: column_2
          EXPRESSION: column_2
        - NAME: column_3
          EXPRESSION: column_3
        - NAME: column_4
          EXPRESSION: column_4
        - NAME: column_5
          EXPRESSION: column_5
      COMMIT:
        MAX_ROW: 1000
        MINIMAL_INTERVAL: 1000
        CONSISTENCY: ATLEAST
      POLL:
        BATCHSIZE: 1000
        TIMEOUT: 1000
      PROPERTIES:
        group.id: test_consumer_group
  2. 單擊確定,并等待實時任務狀態為運行中,即可將數據源中的數據寫入云原生數據倉庫AnalyticDB PostgreSQL版

在任務啟動后會在目標端配置的Schema(專業模式為METADATA.SCHEMA中配置的schema)下生成任務的兩種輔助表,其格式分別為:

  • lissext_$UID:本任務定義的gpfdist外表,用于將數據寫入云原生數據倉庫AnalyticDB PostgreSQL版。

  • lisskafka_mission_info_$UID:用于存儲任務當前位點推進的情況,保障數據寫入的一致性。目前為了保障寫入任務的高可用,每個寫入任務會生成4個子任務,所以每啟動一個寫入任務,會生成4張表。

  • UID是每個寫入任務的唯一標識ID。

附錄:YAML配置說明

YAML配置文件格式如下。

DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <adbpg_port>
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: <kafka_broker_host:broker_port> [, ... ]
        TOPIC: <kafka_topic>
        [PARTITIONS: (<partition_numbers>)]
        [FALLBACK_OFFSET: { earliest | latest }]
      [VALUE:
        COLUMNS:
           - NAME: { <column_name> }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <value_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string>
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
      [KEY:
        COLUMNS:
           - NAME: { <column_name> }
             TYPE: <column_data_type>
           [ ... ]
         FORMAT: <key_data_format>
         [[DELIMITED_OPTION:
            DELIMITER: <delimiter_string> |
            [QUOTE: <quote_char>]
            [ESCAPE: <escape_char>] ] |
         [CSV_OPTION:
            [DELIMITER: <delim_char>]
            [QUOTE: <quote_char>]
            [NULL_STRING: <nullstr_val>]
            [ESCAPE: <escape_char>]
      [META:
         COLUMNS:
            - NAME: <meta_column_name>
              TYPE: { json | jsonb }
         FORMAT: json]
      [ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
   { OUTPUT:
      [SCHEMA: <output_schema_name>]
      TABLE: <table_name>
      [MODE: <mode>]
      [MATCH_COLUMNS: 
         - <match_column_name>
         [ ... ]]
      [ORDER_COLUMNS: 
         - <order_column_name>
         [ ... ]]
      [UPDATE_COLUMNS: 
         - <update_column_name>
         [ ... ]]
      [MAPPING: 
         - NAME: <target_column_name>
           EXPRESSION: { <source_column_name> | <expression> } 
         [ ... ]
           |
         <target_column_name> : { <source_column_name> | <expression> }
         [ ... ] ] }
   [METADATA:
      [SCHEMA: <metadata_schema_name>]]
   COMMIT:
      MAX_ROW: <num_rows>
      MINIMAL_INTERVAL: <wait_time>
      CONSISTENCY: { strong | at-least | at-most | none }
   [POLL:
      BATCHSIZE: <num_records>
      TIMEOUT: <poll_time>]
   [PROPERTIES:
      <kafka_property_name>: <kafka_property_value>
      [ ... ]]
[SCHEDULE:
   RETRY_INTERVAL: <retry_time>
   MAX_RETRIES: <num_retries> ]

數據庫相關配置

參數

描述

是否必填

DATABASE

目標端云原生數據倉庫AnalyticDB PostgreSQL版實例的數據庫名稱。

USER

云原生數據倉庫AnalyticDB PostgreSQL版實例的賬號。

PASSWORD

云原生數據倉庫AnalyticDB PostgreSQL版實例的賬號密碼。

HOST

目標端云原生數據倉庫AnalyticDB PostgreSQL版實例的內網地址。

PORT

云原生數據倉庫AnalyticDB PostgreSQL版實例的端口號。

VERSION

當前采用的YAML文件格式版本,預留字段,無限制。

KAFKA:INPUT配置

KAFKA:INPUT:SOURCE

參數

描述

是否必填

參數值限制

BROKERS

Kafka接入點信息。

  • 阿里云的Kafka服務,可登錄阿里云控制臺獲取默認接入點。具體操作,請參見查看接入點。

  • 自建的Kafka服務,Brokers需要填寫Kafka服務具體的ip:port信息。

如有多個使用英文逗號(,)進行分隔。

對應kafka consumer bootstrap.server 配置,需要填寫有效的Brokers地址,否則會報錯。

TOPIC

Kafka Topic名稱。

僅支持單個Topic。

PARTITIONS

分區編號。

如有多個分區編號,使用英文逗號(,)進行分隔。如果在PROPERTIES中配置了group.id,那么該參數會被忽略。

例如:1,2,3,4,5

FALLBACK_OFFSET

消費位點。

  • earliest:從最早可用位點消費。

  • latest:從最新的位點開始消費。

KAFKA:INPUT:KEY和KAFKA:INPUT:VALUE

Kafka消息的Key值字段名稱、數據類型和數據格式。

Kafka消息的Value字段名稱、數據類型和數據格式。

必須按照在Key和Value中出現的順序指定所有Kafka數據元素。

KAFKA:INPUT:KEYKAFKA:INPUT:VALUE至少需要配置一個,如果兩個都未配置會報錯。

參數

描述

是否必填

參數值限制

COLUMNS

如果定義KAFKA:INPUT:KEY,則用于定義Kafka消息中Key部分的列名與類型;

如果定義KAFKA:INPUT:VALUE,則用于定義Kafka消息中Value部分的列名與類型。

NAME

定義Kafka消息中的列名。該列名主要在KAFKA:OUTPUT:MAPPING中使用,用于標記Kafka消息中的數據列。

TYPE

定義Kafka消息中列的類型,數據類型需要與這個列在目標數據庫中的類型保持一致。

由于Kafka消息中Key和Value的格式不透明,因此當前Streaming Server默認從Kafka消息中獲取的數據格式為文本形式。

云原生數據倉庫AnalyticDB PostgreSQL版支持的數據類型請參見數據類型。

如果Kafka消息的列與目標列的類型不一致,請在Mapping中的expression部分對類型進行轉換。

FORMAT

定義Kafka消息數據的類型,當前支持CSV、Delimited和protobuf。

KAFKA:INPUT:META

META不是必填項,當您需要展示Message Meta信息時配置。

參數

描述

是否必填

參數值限制

COLUMNS

定義Meta信息,為一組NAME,TYPE。

NAME

Meta名稱,可以指定為其他的名稱,默認使用meta

TYPE

只能使用Text類型。

Text

FORMAT

只能使用Text類型。

Text

KAFKA:INPUT:ERROR_LIMIT

錯誤數據的容忍閾值。當寫入的錯誤數據達到ERROR_LIMIT時,Streaming Server會退出當前任務,自動停止將數據源的數據寫入云原生數據倉庫AnalyticDB PostgreSQL版。默認值為0,即Streaming Server會在出現第一次錯誤數據時就退出當前任務,停止數據寫入。ERROR_LIMIT值必須大于1。

目前該參數未啟用,不選擇或者填0即可。

KAFKA:OUTPUT配置

數據庫相關配置

數據寫入到云原生數據倉庫AnalyticDB PostgreSQL版數據庫的相關配置,包括Kafka值到目標數據庫的映射、寫入模式等。

參數

描述

是否必填

SCHEMA

寫入云原生數據倉庫AnalyticDB PostgreSQL版的目標表所在的Schema。

TABLE

目標表的名稱。

MODE

寫入模式,目前支持INSERT、UPDATE和MERGE三種方式。

MATCH_COLUMNS

當寫入模式為UPDATE和MERGE時生效。

指定目標表的部分列,當寫入數據與目標表數據匹配時,目標表中這部分數據會根據UPDATE或MERGE模式對數據進行更新。

建議MATCH_COLUMNS使用目標表的主鍵或者唯一鍵。

ORDER_COLUMNS

在寫入模式(MODE)為MERGE時生效。

當寫入數據根據MATCH_COLUMNS存在多個匹配行時,使用ORDER_COLUMNS對這些數據進行排序,以確定具有最大值的輸入行,Streaming Server使用該行來更新目標。

UPDATE_COLUMNS

當寫入模式為UPDATE和MERGE時生效。

如果寫入數據能夠根據MATCH_COLUMNS匹配到目標表數據,則會基于UPDATE_COLUMNS更新對應的列。

說明
  • 在使用MERGE和UPDATE模式時,如果不指定ORDER_COLUMNS,當寫入數據根據MATCH_COLUMNS匹配到多行相同時,則會隨機取一條作為結果寫入。

  • 在指定了ORDER_COLUMNS后,其排序結果是a desc,b desc,c desc。

KAFKA:OUTPUT:MAPPING

參數

描述

是否必填

NAME

目標列名稱。

EXPRESSION

可以是源端的列名(KAFKA:INPUT:VALUE:COLUMNS中定義的列名),或者一個表達式。例如NAME : targetColumnName Expression: input + 1,其效果等效于SELECT input + 1 AS targetColumnName FROM xxx。

KAFKA:METADATA配置

參數

描述

是否必填

參數限制

schema

Streaming Server創建的外表和其他輔助表所在的Schema名稱。

默認取值KAFKA:OUTPUT中的Schema。

KAFKA:COMMIT配置

COMMIT用于控制向數據庫提交數據的行為。

參數

描述

是否必填

參數限制

MAX_ROW

指定一次寫入目標庫的最大Batch Size。

單位為行,默認:500。

MINIMAL_INTERVAL

在兩個Batch寫入之間的等待時間。如果超過該時間,會嘗試再寫一次。

單位為毫秒(ms),默認:1000。

CONSISTENCY

數據一致性保證。

目前僅支持ATLEAST,即kafka中的數據至少會寫入目標數據庫一次。

KAFKA:POLL配置

POLL用于控制Kafka Consumer消費數據的行為。

參數

描述

是否必填

參數限制

BATCHSIZE

一次從Topic中拿出的event數量。保留字段,目前沒有實現相關功能。

單位為行,默認:64。

TIMEOUT

Kafka Consumer從Kafka中獲取event等待的超時時間。

單位為毫秒(ms),默認:5000。

KAFKA:PROPERTIES配置

PROPERTIES用于配置Kafka Connect,當前采用白名單制,僅支持配置group.id,auto.offset.resetisolation.level。詳細信息,請參見Kafka Connect Configs。