DataX Writer插件實現(xiàn)了寫入數(shù)據(jù)到StarRocks目的表的功能。在底層實現(xiàn)上,DataX Writer通過Stream Load以CSV或JSON格式導(dǎo)入數(shù)據(jù)至StarRocks。內(nèi)部將Reader讀取的數(shù)據(jù)進行緩存后批量導(dǎo)入至StarRocks,以提高寫入性能。阿里云DataWorks已經(jīng)集成了DataX導(dǎo)入的能力,可以同步MaxCompute數(shù)據(jù)到EMR StarRocks。本文為您介紹DataX Writer原理,以及如何使用DataWorks進行離線同步任務(wù)。

背景信息

DataX Writer總體的數(shù)據(jù)流為Source -> Reader -> DataX channel -> Writer -> StarRocks。

功能說明

環(huán)境準(zhǔn)備

您可以下載DataX插件DataX源碼進行測試:

測試時可以使用命令python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json

配置樣例

從MySQL讀取數(shù)據(jù)后導(dǎo)入至StarRocks。
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "column": [ "k1", "k2", "v1", "v2" ],
                        "connection": [
                            {
                                "table": [ "table1", "table2" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test1"
                                ]
                            },
                            {
                                "table": [ "table3", "table4" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test2"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "starrockswriter",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "database": "xxxx",
                        "table": "xxxx",
                        "column": ["k1", "k2", "v1", "v2"],
                        "preSql": [],
                        "postSql": [],
                        "jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
                        "loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030""],
                        "loadProps": {}
                    }
                }
            }
        ]
    }
}
相關(guān)參數(shù)描述如下表所示。
參數(shù)描述是否必選默認(rèn)值
usernameStarRocks數(shù)據(jù)庫的用戶名。
passwordStarRocks數(shù)據(jù)庫的密碼。
databaseStarRocks數(shù)據(jù)庫的名稱。
tableStarRocks表的名稱。
loadUrlStarRocks FE的地址,用于Stream Load,可以為多個FE地址,格式為fe_ip:fe_http_port
column目的表需要寫入數(shù)據(jù)的字段,字段之間用英文逗號(,)分隔。例如,"column": ["id","name","age"]
重要 該參數(shù)必須指定。如果希望導(dǎo)入所有字段,可以使用["*"]
preSql寫入數(shù)據(jù)到目的表前,會先執(zhí)行設(shè)置的標(biāo)準(zhǔn)語句。
postSql寫入數(shù)據(jù)到目的表后,會先執(zhí)行設(shè)置的標(biāo)準(zhǔn)語句。
jdbcUrl目的數(shù)據(jù)庫的JDBC連接信息,用于執(zhí)行preSqlpostSql
maxBatchRows單次Stream Load導(dǎo)入的最大行數(shù)。500000(50W)
maxBatchSize單次Stream Load導(dǎo)入的最大字節(jié)數(shù)。104857600 (100M)
flushInterval上一次Stream Load結(jié)束至下一次開始的時間間隔。單位為ms。300000(ms)
loadPropsStream Load的請求參數(shù),詳情請參見Stream Load

類型轉(zhuǎn)換

默認(rèn)傳入的數(shù)據(jù)均會被轉(zhuǎn)為字符串,并以\t作為列分隔符,\n作為行分隔符,組成CSV文件進行Stream Load導(dǎo)入操作。類型轉(zhuǎn)換示例如下:
  • 更改列分隔符,則loadProps配置如下。
    "loadProps": {
        "column_separator": "\\x01",
        "row_delimiter": "\\x02"
    }
  • 更改導(dǎo)入格式為JSON,則loadProps配置如下。
    "loadProps": {
        "format": "json",
        "strip_outer_array": true
    }

導(dǎo)入案例

重要 請確保RDS MySQL和StarRocks實例在同一個網(wǎng)絡(luò)VPC和VSW下。
  • 創(chuàng)建MySQL源數(shù)據(jù)表
    create table `sr_db`.sr_table(id int, name varchar (1024) ,event_time  DATETIME);
    insert into `sr_db`.sr_table values (1,"aaa","2015-09-12 00:00:00"),(2,"bbb","2015-09-12 00:00:00");
  • 創(chuàng)建StarRocks數(shù)據(jù)表
    CREATE TABLE IF NOT EXISTS load_db.datax_into_tbl (
      id          INT,
      name           STRING,
      event_time  DATETIME
    ) ENGINE=OLAP
    DUPLICATE KEY(id, name)
    DISTRIBUTED BY HASH(id, name) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  • 創(chuàng)建同步任務(wù)
    {
        "job": {
            "setting": {
                "speed": {
                     "channel": 1
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "username",
                            "password": "***",
                            "column": [ "id", "name", "event_time" ],
                            "connection": [
                                {
                                    "table": [ "sr_table"],
                                    "jdbcUrl": [
                                         "jdbc:mysql://rm-*****.mysql.rds.aliyuncs.com:3306/sr_db"
                                    ]
                                }
                            ]
                        }
                    },
                   "writer": {
                        "name": "starrockswriter",
                        "parameter": {
                            "username": "admin",
                            "password": "****",
                            "database": "load_db",
                            "table": "datax_load_tbl",
                            "column": ["id", "name", "event_time"],
                            "preSql": [],
                            "postSql": [],
                            "jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
                            "loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030"],
                            "loadProps": {}
                        }
                    }
                }
            ]
        }
    }
    返回信息如下所示。
    任務(wù)啟動時刻                    : 2023-04-07 13:05:55
    任務(wù)結(jié)束時刻                    : 2023-04-07 13:06:05
    任務(wù)總計耗時                    :                 10s
    任務(wù)平均流量                    :                2B/s
    記錄寫入速度                    :              0rec/s
    讀出記錄總數(shù)                    :                   2
    讀寫失敗總數(shù)                    :                   0

DataWorks離線同步使用方式

  1. 在DataWorks上創(chuàng)建工作空間,詳情請參見創(chuàng)建工作空間
  2. 在DataWorks上創(chuàng)建測試表并上傳數(shù)據(jù)到MaxCompute數(shù)據(jù)源,詳情請參見建表并上傳數(shù)據(jù)
  3. 創(chuàng)建StarRocks數(shù)據(jù)源。
    1. 在DataWorks的工作空間列表頁面,單擊目標(biāo)工作空間操作列的數(shù)據(jù)集成
    2. 在左側(cè)導(dǎo)航欄,單擊數(shù)據(jù)源
    3. 單擊右上角的新增數(shù)據(jù)源
    4. 新增數(shù)據(jù)源對話框中,新增StarRocks類型的數(shù)據(jù)源。
      add_StarRocks
  4. 創(chuàng)建離線同步任務(wù)流程。
    1. 新建業(yè)務(wù)流程,詳情請參見創(chuàng)建業(yè)務(wù)流程
    2. 在目錄業(yè)務(wù)流程,新建離線同步任務(wù),詳情請參見創(chuàng)建離線同步節(jié)點
      add_node
  5. 在StarRocks實例中查看數(shù)據(jù),詳情請參見查看元數(shù)據(jù)