日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

大數據計算服務MaxCompute

更新時間:

本文為您介紹大數據計算服務MaxCompute連接器的語法結構、WITH參數和使用示例等。

背景信息

大數據計算服務MaxCompute(原名ODPS)是一種快速、完全托管的EB級數據倉庫解決方案,致力于批量結構化數據的存儲和計算,提供海量數據倉庫的解決方案及分析建模服務。MaxCompute的詳情請參見什么是MaxCompute。

MaxCompute連接器支持的信息如下。

類別

詳情

支持類型

源表、維表和結果表

運行模式

流模式和批模式

數據格式

暫不支持

特有監控指標

  • 源表

    numRecordsIn:源表當前讀取到的數據總條數。

    numRecordsInPerSecond:源表當前每秒讀取的數據條數。

    numBytesIn:源表當前讀取到的數據總字節數(解壓縮后)。

    numBytesInPerSecond:源表當前每秒讀取的數據字節數(解壓縮后)。

  • 結果表

    numRecordsOut:結果表當前寫出的數據總條數。

    numRecordsOutPerSecond:結果表當前每秒寫出的數據條數。

    numBytesOut:結果表當前寫出的數據總字節數(壓縮前)。

    numBytesOutPerSecond:結果表當前每秒寫出的數據字節數(壓縮前)。

  • 維表

    dim.odps.cacheSize:維表緩存的數據條數。

說明

指標含義詳情,請參見監控指標說明。

API種類

Datastream和SQL

是否支持更新或刪除結果表數據

Batch Tunnel和Stream Tunnel模式僅支持插入數據,Upsert Tunnel模式支持插入、更新和刪除數據。

前提條件

已創建MaxCompute表,詳情請參見創建表。

使用限制

  • 僅實時計算引擎VVR 2.0.0及以上版本支持MaxCompute連接器。

  • MaxCompute連接器僅支持At Least Once語義。

    說明

    At Least Once語義會保證數據不缺失,但在少部分情況下,可能會將重復數據寫入MaxCompute。不同的MaxCompute Tunnel出現重復數據的情況不同,MaxCompute Tunnel詳情請參見如何選擇數據通道?。

  • 默認情況下源表為全量模式,僅會讀取partition參數中指定的分區,在讀完所有數據后結束運行,狀態轉換為finished,不會監控是否有新分區產生。

    如果您需要持續監控新分區,請通過WITH參數中指定startPartition使用增量源表模式。

    說明
    • 維表每次更新時都會檢查最新分區,不受這一限制。

    • 在源表開始運行后,向分區里添加的新數據不會被讀取,請在分區數據完整的情況下運行作業。

語法結構

CREATE TABLE odps_source(
  id INT,
  user_name VARCHAR,
  content VARCHAR
) WITH (
  'connector' = 'odps', 
  'endpoint' = '<yourEndpoint>',
  'tunnelEndpoint' = '<yourTunnelEndpoint>',
  'project' = '<yourProjectName>',
  'schemaName' = '<yourSchemaName>',
  'tableName' = '<yourTableName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'partition' = 'ds=2018****'
);

WITH參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    表類型。

    String

    固定值為odps。

    endpoint

    MaxCompute服務地址。

    String

    請參見Endpoint。

    tunnelEndpoint

    MaxCompute Tunnel服務的連接地址。

    String

    請參見Endpoint。

    說明
    • VPC環境下為必填。

    • 如果未填寫,MaxCompute會根據內部的負載均衡服務分配Tunnel的連接。

    project

    MaxCompute項目名稱。

    String

    無。

    schemaName

    MaxCompute Schema名稱。

    String

    僅當MaxCompute項目開啟Schema功能時,需填寫該值為MaxCompute表所屬Schema名,詳情請參見 Schema操作。

    說明

    僅實時計算引擎VVR 8.0.6及以上版本支持該參數。

    tableName

    MaxCompute表名。

    String

    無。

    accessId

    MaxCompute AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

    accessKey

    MaxCompute AccessKey Secret。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理。

    partition

    MaxCompute分區名。

    String

    對于非分區表和增量源表無需填寫。

    compressAlgorithm

    MaxCompute Tunnel使用的壓縮算法。

    String

    • VVR 4.0.13及以上版本:ZLIB

    • VVR 6.0.1及以上版本:SNAPPY

    參數取值如下:

    • RAW(無壓縮)

    • ZLIB

    • SNAPPY

      SNAPPY相比ZLIB能帶來明顯的吞吐提升。在測試場景下,吞吐提升約50%。

    說明

    僅實時計算引擎VVR 4.0.13及以上版本支持該參數。

    quotaName

    MaxCompute獨享數據傳輸服務的quota名稱。

    String

    設置該值來使用獨享的MaxCompute數據傳輸服務。

    重要
    • 僅實時計算引擎VVR 8.0.3及以上版本支持該參數。

    • 設置該值時,必須刪除tunnelEndpoint參數,否則仍將使用tunnelEndpoint中指定的數據通道。

    說明

    MaxCompute獨享數據傳輸參見購買與使用獨享數據傳輸服務資源組。

  • 源表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    maxPartitionCount

    可以讀取的最大分區數量。

    Integer

    100

    如果讀取的分區數量超過了該參數,則會出現報錯The number of matched partitions exceeds the default limit。

    重要

    由于一次性讀取大量分區會給MaxCompute服務帶來一定壓力,同時也會讓作業啟動速度變慢,因此您需要確認是否需要讀取這么多分區(而不是誤填partition參數)。如果確實需要,需要手動調大maxPartitionCount參數。

    useArrow

    是否使用Arrow格式讀取數據。

    Boolean

    false

    使用Arrow格式能夠調用MaxCompute的Storage API,詳情請參見什么是MaxCompute中用戶接口與開放性一節。

    重要
    • 僅在批作業中生效。

    • 僅實時計算引擎VVR 8.0.8及以上版本支持該參數。

    splitSize

    在使用Arrow格式讀取數據時,一次拉取的數據大小。

    MemorySize

    256 MB

    僅實時計算引擎VVR 8.0.8及以上版本支持該參數。

    重要

    僅在批作業中生效。

    compressCodec

    在使用Arrow格式讀取數據時,采用的壓縮算法。

    String

    ""

    參數取值如下:

    • "" (無壓縮)

    • ZSTD

    • LZ4_FRAME

    指定壓縮算法相比無壓縮能帶來一定的吞吐提升。

    重要
    • 僅在批作業中生效。

    • 僅實時計算引擎VVR 8.0.8及以上版本支持該參數。

    dynamicLoadBalance

    是否允許動態分配分片。

    Boolean

    false

    參數取值如下:

    • true:允許

    • false:不允許

    允許動態分配分片能夠發揮Flink不同節點的處理性能,減少源表整體讀取時間,但也會導致不同節點讀取總數據量不一致,出現數據傾斜情況。

    重要
    • 僅在批作業中生效。

    • 僅實時計算引擎VVR 8.0.8及以上版本支持該參數。

  • 增量源表獨有

    增量源表通過間歇輪詢MaxCompute服務器獲取所有的分區信息來發現新增的分區,讀取新分區時要求分區內數據已寫入完畢,詳情參見增量MaxCompute源表監聽到新分區時,如果該分區還有數據沒有寫完,如何處理?。通過startPartition可以指定起始點位,但注意只讀取字典序大于等于起始點位的分區,例如分區year=2023,month=10字典序小于分區year=2023,month=9,對于這種類型的分區聲明可以通過加0補齊的方式來保證字典序正確,例如year=2023,month=09。

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    startPartition

    增量讀取的起始MaxCompute分區點位(包含)。

    String

    • 使用該參數后啟用增量源表模式,將忽略partition參數。

    • 多級分區必須按分區級別從大到小聲明每個分區列的值。

    說明

    startPartition參數詳情,請參見如何填寫增量MaxCompute的startPartition參數?。

    subscribeIntervalInSec

    輪詢MaxCompute獲取分區列表的時間間隔。

    Integer

    30

    單位為秒。

    modifiedTableOperation

    讀取分區過程中遇到分區數據被修改時的處理。

    Enum (NONE, SKIP)

    NONE

    由于下載session被保存在檢查點中,每次從檢查點恢復時嘗試從該session恢復讀取進度,而該session由于分區數據被修改不可用,Flink任務會陷入不斷重啟。此時您可以設置該參數,參數取值如下:

    • NONE:需要您修改startPartition參數使其大于不可用分區,并從無狀態啟動作業。

    • SKIP:若不希望無狀態啟動,可將模式修改為SKIP,Flink嘗試從檢查點恢復session時將跳過不可用的分區。

    重要

    NONE和SKIP模式下,被修改分區中已讀取的數據不會被撤回,未讀取的數據將不會被讀取。

  • 結果表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    useStreamTunnel

    是否使用MaxCompute Stream Tunnel上傳數據。

    Boolean

    false

    參數取值如下:

    • true:使用MaxCompute Stream Tunnel上傳數據。

    • false:使用MaxCompute Batch Tunnel上傳數據。

    說明

    flushIntervalMs

    MaxCompute Tunnel Writer緩沖區flush間隔。

    Long

    30000(30秒)

    MaxCompute Sink寫入記錄時,先將數據存儲到MaxCompute的緩沖區中,等緩沖區溢出或者每隔一段時間(flushIntervalMs),再把緩沖區里的數據寫到目標MaxCompute表。

    對于Stream Tunnel,flush的數據立即可見;對于Batch Tunnel,數據flush后仍需要等待checkpoint完成后才可見,建議設置該參數為0來關閉定時flush。

    單位為毫秒。

    說明

    本參數可以與batchSize一同使用,滿足任一條件即會Flush數據。

    batchSize

    MaxCompute Tunnel Writer緩沖區flush的大小。

    Long

    67108864(64 MB)

    MaxCompute Sink寫入記錄時,先將數據存儲到MaxCompute的緩沖區中,等緩沖區達到一定大小(batchSize),再把緩沖區里的數據寫到目標MaxCompute表。

    單位為字節。

    說明
    • 僅實時計算引擎VVR 4.0.14及以上版本支持該參數。

    • 本參數可以與flushIntervalMs一同使用,滿足任一條件即會Flush數據。

    numFlushThreads

    MaxCompute Tunnel Writer緩沖區flush的線程數。

    Integer

    1

    每個MaxCompute Sink并發將創建numFlushThreads個線程用于flush數據。當該值大于1時,將允許不同分區的數據并發Flush,提升Flush的效率。

    說明

    僅實時計算引擎VVR 4.0.14及以上版本支持該參數。

    dynamicPartitionLimit

    寫入動態分區的最大數量。

    Integer

    100

    當結果表在兩次Checkpoint之間寫入的動態分區數量超過了dynamicPartitionLimit,則會出現報錯Too many dynamic partitions

    重要

    由于一次性寫入大量分區會給MaxCompute服務帶來一定壓力,同時也會導致結果表flush和作業Checkpoint變慢。因此當報錯出現時,您需要確認是否需要寫入這么多分區。如果確實需要,需要手動調大dynamicPartitionLimit參數。

    retryTimes

    向MaxCompute服務器請求最大重試次數。

    Integer

    3

    創建session、提交session、flush數據時可能存在短暫的MaxCompute服務不可用的情況,會根據該配置進行重試。

    sleepMillis

    重試間隔時間。

    Integer

    1000

    單位為毫秒。

    enableUpsert

    是否使用MaxCompute Upsert Tunnel上傳數據。

    Boolean

    false

    參數取值如下:

    • true:使用Upsert Tunnel,處理Flink中的INSERT、UPDATE_AFTER和DELETE數據。

    • false:根據useStreamTunnel參數使用Batch Tunnel或Stream Tunnel,處理Flink中的INSERT、UPDATE_AFTER數據。

    重要
    • 若Upsert模式下MaxCompute sink提交時出現報錯、失敗、耗時長等情況,建議限制sink節點的并發數在10以內。

    • 僅實時計算引擎VVR 8.0.6及以上版本支持該參數。

    upsertAsyncCommit

    Upsert模式下在提交session時是否使用異步模式。

    Boolean

    false

    參數取值如下:

    • true:使用異步模式,提交耗時更短,但提交完成時寫入的數據非立即可讀。

    • false:默認為同步模式,提交時將等待服務側處理完session。

    說明

    僅實時計算引擎VVR 8.0.6及以上版本支持該參數。

    upsertCommitTimeoutMs

    Upsert模式下提交session超時時間。

    Integer

    120000

    (120秒)

    僅實時計算引擎VVR 8.0.6及以上版本支持該參數。

  • 維表獨有

    MaxCompute維表在作業啟動時從指定的分區拉取全量數據,partition參數支持使用max_pt()等函數。當緩存過期重新加載時會重新解析partition參數拉取最新的分區,使用max_two_pt()時維表可拉取兩個分區,其他情況下只支持指定單個分區。

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    cache

    緩存策略。

    String

    目前MaxCompute維表僅支持ALL策略,必須顯式聲明。 適用于遠程表數據量小且MISS KEY(源表數據和維表JOIN時,ON條件無法關聯)特別多的場景。

    ALL策略:緩存維表里的所有數據。在Job運行前,系統會將維表中所有數據加載到Cache中,之后所有的維表查詢都會通過Cache進行。如果在Cache中無法找到數據,則KEY不存在,并在Cache過期后重新加載一遍全量Cache。

    說明
    • 因為系統會異步加載維表數據,所以在使用CACHE ALL時,需要增加維表JOIN節點的內存,增加的內存大小為遠程表數據量的至少4倍,具體值與MaxCompute存儲壓縮算法有關。

    • 如果MaxCompute維表數據量較大,可以考慮使用SHUFFLE_HASH注解將維表數據均勻分散到各個并發中。詳情請參見如何使用維表SHUFFLE_HASH注解?

    • 在使用超大MaxCompute維表時,如果JVM頻繁GC導致作業異常,且在增加維表JOIN節點的內存仍無改善的情況下,建議改為支持LRU Cache策略的KV型維表,例如云數據庫Hbase版維表。

    cacheSize

    最多緩存的數據條數。

    Long

    100000

    如果維表數據量超過了cacheSize,則會出現報錯Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit

    重要

    由于維表數據量太大會占用大量JVM堆內存,同時也會讓作業啟動和維表更新變慢,因此您需要確認是否需要緩存這么多數據,如果確實需要,需要手動調大該參數。

    cacheTTLMs

    緩存超時時間,也就是緩存更新的間隔時間。

    Long

    Long.MAX_VALUE(相當于永不更新)

    單位為毫秒。

    cacheReloadTimeBlackList

    更新時間黑名單。在該參數規定的時間段內不會更新緩存。

    String

    用于防止緩存在關鍵時間段(例如活動流量峰值期間)更新導致作業不穩定。填寫方式詳情請參見如何填寫CacheReloadTimeBlackList參數?。

    maxLoadRetries

    緩存更新時(包含作業啟動時初次拉取數據)最多嘗試次數,超過該次數后作業運行失敗。

    Integer

    10

    無。

類型映射

MaxCompute支持的類型參見2.0數據類型版本。

MaxCompute類型

Flink類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(precision, scale)

DECIMAL(precision, scale)

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

STRING

STRING

BINARY

BYTES

DATE

DATE

DATETIME

TIMESTAMP(3)

TIMESTAMP

TIMESTAMP(9)

ARRAY

ARRAY

MAP

MAP

STRUCT

ROW

JSON

STRING

重要

當MaxCompute物理表中同時存在嵌套的復合類型字段(ARRAY、MAP或STRUCT)和JSON類型字段時,需要在創建MaxCompute物理表時指定tblproperties('columnar.nested.type'='true'),才能被Flink正確讀寫。

使用示例

SQL

  • 源表示例

    • 全量讀取

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=201809*'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT
         cid,
         COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
    • 增量讀取

      CREATE TEMPORARY TABLE odps_source (
        cid VARCHAR,
        rt DOUBLE
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpointName>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 從20180905對應分區開始讀取
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        cid VARCHAR,
        invoke_count BIGINT
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT cid, COUNT(*) AS invoke_count
      FROM odps_source GROUP BY cid;
  • 結果表示例

    • 寫入固定分區

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id INT,
        len INT,
        content VARCHAR
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905' -- 寫入固定分區ds=20180905。
      );
      
      INSERT INTO odps_sink
      SELECT
        id, len, content
      FROM datagen_source;
    • 寫入動態分區

      CREATE TEMPORARY TABLE datagen_source (
        id INT,
        len INT,
        content VARCHAR,
        c TIMESTAMP
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_sink (
        id  INT,
        len INT,
        content VARCHAR,
        ds VARCHAR --需要顯式聲明動態分區列。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds' --不寫分區的值,表示根據ds字段的值寫入不同分區。
      );
      
      INSERT INTO odps_sink
      SELECT
         id,
         len,
         content,
         DATE_FORMAT(c, 'yyMMdd') as ds
      FROM datagen_source;
  • 維表示例

    • 一對一維表

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR,
        PRIMARY KEY (k) NOT ENFORCED  -- 一對一維表需要聲明主鍵。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
    • 一對多維表

      CREATE TEMPORARY TABLE datagen_source (
        k INT,
        v VARCHAR
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE odps_dim (
        k INT,
        v VARCHAR
        -- 一對多維表無需聲明主鍵。
      ) WITH (
        'connector' = 'odps',
        'endpoint' = '<yourEndpoint>',
        'project' = '<yourProjectName>',
        'tableName' = '<yourTableName>',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'partition' = 'ds=20180905',
        'cache' = 'ALL'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        k VARCHAR,
        v1 VARCHAR,
        v2 VARCHAR
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT k, s.v, d.v
      FROM datagen_source AS s
      INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;

DataStream

重要
  • 通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink,DataStream連接器設置方法請參見DataStream連接器使用方法

  • 為了保護知識產權,從實時計算引擎VVR6.0.6版本起,此連接器在本地調試單次運行作業的時間為30分鐘,30分鐘后作業會報錯并退出。本地運行和調試包含MaxCompute連接器的作業參見本地運行和調試包含連接器的作業。

  • 若您在Flink開發控制臺提交作業后,出現本地運行和調試包含連接器的作業中類似的MaxCompute相關類ClassNotFound問題,請下載Maven中央庫中對應版本中后綴為uber.jar的文件,添加為作業的附加依賴。以1.15-vvr-6.0.6版本為例,需下載的文件為該倉庫目錄下的verveica-connector-odps-1.15-vvr-6.0.6-uber.jar。

在DataStream中使用MaxCompute連接器推薦使用SQL聲明MaxCompute表,通過Table/DataStream相互轉換來連接MaxCompute表和數據流

連接源表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=201809*'",
    ")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source"); 

連接結果表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
    "\n",
    "CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
    "  cid VARCHAR,",
    "  rt DOUBLE",
    ") WITH (",
    "  'connector' = 'odps',",
    "  'endpoint' = '<yourEndpointName>',",
    "  'project' = '<yourProjectName>',",
    "  'tableName' = '<yourTableName>',",
    "  'accessId' = '<yourAccessId>',",
    "  'accessKey' = '<yourAccessPassword>',",
    "  'partition' = 'ds=20180905'",
    ")");
DataStream<Row> data = env.fromElements(
    Row.of("id0", 3.),
    Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();

XML

MaxCompute連接器的Maven依賴包含了構建全量源表、增量源表、結果表和維表的所需要的類。Maven中央庫中已經放置了MaxCompute DataStream連接器。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-odps</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題