Flink Connector內(nèi)部實(shí)現(xiàn)是通過(guò)緩存并批量由Stream Load導(dǎo)入。本文為您介紹Flink Connector的使用方式及示例。

背景信息

因?yàn)镕link官方只提供了flink-connector-jdbc,不足以滿足導(dǎo)入性能要求,所以新增了flink-connector-starrocks,其內(nèi)部實(shí)現(xiàn)是通過(guò)緩存并批量由Stream Load導(dǎo)入。

使用方式

您可以下載源碼進(jìn)行測(cè)試:下載flink-connector-starrocks源碼

將以下內(nèi)容添加pom.xml中。
<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for flink-1.11, flink-1.12 -->
    <version>x.x.x_flink-1.11</version>
    <!-- for flink-1.13 -->
    <version>x.x.x_flink-1.13</version>
</dependency>
說(shuō)明 您可以在版本信息頁(yè)面,查看Latest Version信息,替換代碼中的x.x.x
代碼示例如下:
  • 方式一
    // -------- sink with raw json string stream --------
    fromElements(new String[]{
        "{\"score\": \"99\", \"name\": \"stephen\"}",
        "{\"score\": \"100\", \"name\": \"lebron\"}"
    }).addSink(
        StarRocksSink.sink(
            // the sink options
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "admin")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                .withProperty("sink.properties.format", "json")
                .withProperty("sink.properties.strip_outer_array", "true")
                .build()
        )
    );
    
    
    // -------- sink with stream transformation --------
    class RowData {
        public int score;
        public String name;
        public RowData(int score, String name) {
            ......
        }
    }
    fromElements(
        new RowData[]{
            new RowData(99, "stephen"),
            new RowData(100, "lebron")
        }
    ).addSink(
        StarRocksSink.sink(
            // the table structure
            TableSchema.builder()
                .field("score", DataTypes.INT())
                .field("name", DataTypes.VARCHAR(20))
                .build(),
            // the sink options
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                .withProperty("sink.properties.column_separator", "\\x01")
                .withProperty("sink.properties.row_delimiter", "\\x02")
                .build(),
            // set the slots with streamRowData
            (slots, streamRowData) -> {
                slots[0] = streamRowData.score;
                slots[1] = streamRowData.name;
            }
        )
    );
  • 方式二
    // create a table with `structure` and `properties`
    // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory`
    tEnv.executeSql(
        "CREATE TABLE USER_RESULT(" +
            "name VARCHAR," +
            "score BIGINT" +
        ") WITH ( " +
            "'connector' = 'starrocks'," +
            "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port'," +
            "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
            "'database-name' = 'xxx'," +
            "'table-name' = 'xxx'," +
            "'username' = 'xxx'," +
            "'password' = 'xxx'," +
            "'sink.buffer-flush.max-rows' = '1000000'," +
            "'sink.buffer-flush.max-bytes' = '300000000'," +
            "'sink.buffer-flush.interval-ms' = '5000'," +
            "'sink.properties.column_separator' = '\\x01'," +
            "'sink.properties.row_delimiter' = '\\x02'," +
            "'sink.max-retries' = '3'" +
            "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'`
        ")"
    );
其中,Sink選項(xiàng)如下表所示。
參數(shù)是否必選默認(rèn)值類型描述
connector無(wú)String類型,固定為starrocks。
jdbc-url無(wú)String用于在StarRocks中執(zhí)行查詢操作。
load-url無(wú)String指定FE的IP和HTTP端口,格式為fe_ip:http_port;fe_ip:http_port ,多個(gè)時(shí)使用半角分號(hào)(;)分隔。
database-name無(wú)StringStarRocks數(shù)據(jù)庫(kù)名稱。
table-name無(wú)StringStarRocks表名稱。
username無(wú)StringStarRocks連接用戶名。
password無(wú)StringStarRocks連接密碼。
sink.semanticat-least-onceString取值為at-least-once或exactly-once。
sink.buffer-flush.max-bytes94371840(90M)StringBuffer可容納的最大數(shù)據(jù)量,取值范圍為64 MB~10 GB。
sink.buffer-flush.max-rows500000StringBuffer可容納的最大數(shù)據(jù)行數(shù),取值范圍為64,000~5000,000。
sink.buffer-flush.interval-ms300000StringBuffer刷新時(shí)間間隔,取值范圍為 1000 ms~3600000 ms。
sink.max-retries1String最大重試次數(shù),取值范圍為0~10。
sink.connect.timeout-ms1000String連接到load-url的超時(shí)時(shí)間,取值范圍為100~60000。
sink.properties.*無(wú)StringSink屬性。
重要
  • 為了保證Sink數(shù)據(jù)的Exactly-once語(yǔ)義,需要外部系統(tǒng)的Two-phase Commit機(jī)制。由于StarRocks無(wú)此機(jī)制,所以需要依賴Flink的checkpoint-interval,在每次Checkpoint時(shí)保存批數(shù)據(jù)以及其Label,在Checkpoint完成后的第一次invoke中阻塞flush所有緩存在state中的數(shù)據(jù),以此達(dá)到Exactly-once。但如果StarRocks終止了,會(huì)導(dǎo)致您的Flink Sink Stream算子長(zhǎng)時(shí)間阻塞,并引起Flink的監(jiān)控報(bào)警或強(qiáng)制退出。
  • 默認(rèn)使用CSV格式進(jìn)行導(dǎo)入,您可以通過(guò)指定sink.properties.row_delimiter(該參數(shù)自StarRocks 1.15.0版本開(kāi)始支持)為\\x02,sink.properties.column_separator為\\x01,來(lái)自定義行分隔符與列分隔符。
  • 如果遇到導(dǎo)入停止的情況,請(qǐng)嘗試增加Flink任務(wù)的內(nèi)存。
  • 如果代碼運(yùn)行正常且能接收到數(shù)據(jù),但是寫入不成功時(shí),請(qǐng)確認(rèn)當(dāng)前機(jī)器是否能訪問(wèn)BE的http_port端口,即能ping通BE服務(wù)使用的IP地址。

    例如,如果一臺(tái)機(jī)器有外網(wǎng)和內(nèi)網(wǎng)IP,且FE或BE的http_port均可通過(guò)外網(wǎng)ip:port訪問(wèn),集群里綁定的IP為內(nèi)網(wǎng)IP,任務(wù)里loadurl寫的FE外網(wǎng)ip:http_port,F(xiàn)E會(huì)將寫入任務(wù)轉(zhuǎn)發(fā)給BE內(nèi)網(wǎng)ip:port,此時(shí)如果Client機(jī)器ping不通BE的內(nèi)網(wǎng)IP就會(huì)寫入失敗。

示例

詳情請(qǐng)參見(jiàn)使用Flink CDC同步MySQL數(shù)據(jù)至StarRocks