日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

云消息隊列 RocketMQ 版

本文為您介紹云消息隊列 RocketMQ 版連接器。

重要

鑒于云消息隊列 RocketMQ 版 4.x標(biāo)準(zhǔn)版實例共享API調(diào)用彈性上限為每秒5000次,使用該版本的消息中間件在與實時計算Flink版對接時,若超過上限會觸發(fā)限流機(jī)制,可能會導(dǎo)致Flink作業(yè)運行不穩(wěn)定。因此,在選擇消息中間件時,如果您正在或計劃通過標(biāo)準(zhǔn)版RocketMQ與Flink對接,請您謹(jǐn)慎評估。如果業(yè)務(wù)場景允許,請考慮使用Kafka、日志服務(wù)(SLS)或DataHub等其他中間件進(jìn)行替代。如果您確實需要使用云消息隊列 RocketMQ 版 4.x標(biāo)準(zhǔn)版處理大規(guī)模的消息,也請同時通過提交工單與RocketMQ產(chǎn)品取得聯(lián)系申請?zhí)嵘匏偕舷蕖?/p>

背景信息

云消息隊列 RocketMQ 版是阿里云基于Apache RocketMQ構(gòu)建的低延遲、高并發(fā)、高可用和高可靠的分布式消息中間件。其既可為分布式應(yīng)用系統(tǒng)提供異步解耦和削峰填谷的能力,同時也具備互聯(lián)網(wǎng)應(yīng)用所需的海量消息堆積、高吞吐和可靠重試等特性。

RocketMQ連接器支持的信息如下。

類別

詳情

支持類型

源表和結(jié)果表

運行模式

僅支持流模式

數(shù)據(jù)格式

CSV和二進(jìn)制格式

特有監(jiān)控指標(biāo)

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • 結(jié)果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

說明

指標(biāo)含義詳情,請參見監(jiān)控指標(biāo)說明

API種類

Datastream(僅支持RocketMQ 4.x)和SQL

是否支持更新或刪除結(jié)果表數(shù)據(jù)

不支持刪除結(jié)果表數(shù)據(jù),只支持插入和更新數(shù)據(jù)。

特色功能

RocketMQ源表和結(jié)果表支持屬性字段,具體如下。

  • 源表屬性字段

    說明

    僅在VVR 3.0.1及以上版本支持獲取以下RocketMQ屬性字段。

    字段名

    字段類型

    說明

    topic

    VARCHAR METADATA VIRTUAL

    消息Topic。

    queue-id

    INT METADATA VIRTUAL

    消息隊列ID。

    queue-offset

    BIGINT METADATA VIRTUAL

    消息隊列的消費位點。

    msg-id

    VARCHAR METADATA VIRTUAL

    消息ID。

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    消息存儲時間。

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    消息生成時間。

    keys

    VARCHAR METADATA VIRTUAL

    消息Keys。

    tags

    VARCHAR METADATA VIRTUAL

    消息Tags。

  • 結(jié)果表屬性字段

    說明

    僅實時計算引擎VVR 4.0.0及以上版本支持以下RocketMQ屬性字段。

    字段名

    字段類型

    說明

    keys

    VARCHAR METADATA

    消息Keys。

    tags

    VARCHAR METADATA

    消息Tags。

前提條件

已創(chuàng)建了RocketMQ資源,詳情請參見創(chuàng)建資源

使用限制

  • 僅Flink實時計算引擎VVR 2.0.0及以上版本支持RocketMQ連接器。

  • 僅Flink實時計算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。

  • 在Flink實時計算引擎VVR 6.0.2以下版本,源表的并發(fā)度必須小于等于RocketMQ topic的分區(qū)數(shù),在實時計算引擎VVR 6.0.2及以上版本解除該限制。您可以提前設(shè)置大于分區(qū)數(shù)的并發(fā)度,不需要因RocketMQ的縮容而手動調(diào)整作業(yè)并發(fā)度。

  • RocketMQ連接器使用Pull Consumer消費,所有的子任務(wù)分擔(dān)消費。

語法結(jié)構(gòu)

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH參數(shù)

  • 通用

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    connector

    connector類型。

    String

    • RocketMQ 4.x固定值為mq

    • RocketMQ 5.x固定值為mq5

    endPoint

    EndPoint地址

    String

    云消息隊列 RocketMQ 版接入地址支持以下兩種類型:

    • VVR 3.0.1及以上版本的作業(yè):需要使用TCP協(xié)議客戶端接入點,詳情請參見

      • 內(nèi)網(wǎng)服務(wù)MQ(阿里云經(jīng)典網(wǎng)絡(luò)/VPC)接入地址:在MQ控制臺目標(biāo)實例詳情中,選擇接入點 > TCP協(xié)議客戶端接入點 > 內(nèi)網(wǎng)訪問,獲取對應(yīng)的EndPoint。

      • 公網(wǎng)服務(wù)MQ接入地址:在MQ控制臺目標(biāo)實例詳情中,選擇接入點 > TCP協(xié)議 > 客戶端接入點 > 公網(wǎng)訪問,獲取對應(yīng)的EndPoint。

      重要

      由于阿里云網(wǎng)絡(luò)安全策略動態(tài)變化,實時計算連接公網(wǎng)服務(wù)MQ時可能會出現(xiàn)網(wǎng)絡(luò)連接問題,推薦您使用內(nèi)網(wǎng)服務(wù)MQ。

      • 內(nèi)網(wǎng)服務(wù)無法跨域訪問。例如,您所購買的實時計算服務(wù)的地域為華東1,但是購買的RocketMQ服務(wù)的地域為華東2(上海),則無法訪問。

      • 通過公網(wǎng)方式訪問RocketMQ,需要配置NAT,詳情請參見創(chuàng)建和管理公網(wǎng)NAT網(wǎng)關(guān)實例

    • VVR 3.0.1以下版本的作業(yè):RocketMQ舊的接入點已不可用,您需要適配升級實時計算作業(yè)。

      重要

      如果您已使用了VVR 3.0.1以下版本的RocketMQ連接器,則您需要將您的實時計算作業(yè)升級至VVR 3.0.1及以上版本,并將作業(yè)中EndPoint參數(shù)取值更改為新的RocketMQ接入點,舊的接入點存在穩(wěn)定性風(fēng)險或不可用的問題,詳情請參見實時計算Flink版產(chǎn)品公告

    topic

    topic名稱。

    String

    無。

    accessId

    • 4.x:阿里云賬號的AccessKey ID。

    • 5.x:

      RocketMQ實例用戶名

    String

    • RocketMQ 4.x:是

    • RocketMQ 5.x:否

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

    • RocketMQ 5.x:如果是使用公網(wǎng)接入點訪問,需配置為RocketMQ控制臺實例用戶名。如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置。

    accessKey

    • 4.x: 阿里云賬號的AccessKey Secret。

    • 5.x:實例密碼

    String

    • RocketMQ 4.x:是

    • RocketMQ 5.x:否

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理

    • RocketMQ 5.x:如果是使用公網(wǎng)接入點訪問,需配置為RocketMQ控制臺實例密碼。如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置。

    tag

    訂閱或?qū)懭氲臉?biāo)簽

    String

    • RocketMQ作為源表時,只能讀取單個tag。

    • RocketMQ作為結(jié)果表時,支持設(shè)置多個tag,以逗號(,)進(jìn)行分隔。

    說明

    當(dāng)作為結(jié)果表時,僅支持RocketMQ 4.x。RocketMQ 5.x請使用結(jié)果表屬性字段來指定寫出消息的 tag。

    nameServerSubgroup

    NameServer組。

    String

    • 內(nèi)網(wǎng)服務(wù)(阿里云經(jīng)典網(wǎng)絡(luò)或VPC):必須配置'nameServerSubgroup' = 'nsaddr4client-internal'

    • 公網(wǎng)服務(wù):無需配置nameServerSubgroup

    說明

    僅VVR 2.1.1-VVR 3.0.0版本支持該參數(shù),VVR 3.0.1及以后版本不支持該參數(shù)。

    encoding

    編碼格式。

    String

    UTF-8

    無。

    instanceID

    RocketMQ實例ID。

    String

    • 如果RocketMQ實例無獨立命名空間,則不可以使用instanceID參數(shù)。

    • 如果RocketMQ實例有獨立命名空間,則instanceID參數(shù)必選。

    說明

    僅RocketMQ 4.x支持該參數(shù),RocketMQ 5.x不需要配置該參數(shù)。

  • 源表獨有

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    consumerGroup

    Consumer組名。

    String

    無。

    pullIntervalMs

    上游沒有數(shù)據(jù)可供消費時,source的休眠時間。

    Int

    單位為毫秒。

    目前沒有限流機(jī)制,無法設(shè)置讀取RocketMQ的速率。

    說明

    僅RocketMQ 4.x支持該參數(shù),RocketMQ 5.x不需要配置該參數(shù)。

    timeZone

    時區(qū)。

    String

    例如,Asia/Shanghai。

    startTimeMs

    啟動時間點。

    Long

    時間戳,單位為毫秒。

    startMessageOffset

    消息開始的偏移量。

    Int

    如果填寫該參數(shù),則優(yōu)先以startMessageOffset的位點開始加載數(shù)據(jù)。

    lineDelimiter

    解析Block時,行分隔符。

    String

    \n

    無。

    fieldDelimiter

    字段分隔符。

    String

    \u0001

    根據(jù)MQ終端的模式,分隔符分別為:

    • 在只讀模式下(默認(rèn)模式),分隔符為\u0001。該模式下,分隔符不可見。

    • 在編輯模式下,分隔符為^A

    lengthCheck

    單行字段條數(shù)檢查策略。

    Int

    NONE

    取值如下:

    • NONE:默認(rèn)值。

      • 解析出的字段數(shù)大于定義字段數(shù)時,按從左到右的順序,取定義字段數(shù)量的數(shù)據(jù)。

      • 解析出的字段數(shù)小于定義字段數(shù)時,跳過這行數(shù)據(jù)。

    • SKIP:解析出的字段數(shù)和定義字段數(shù)不同時跳過數(shù)據(jù)。

    • EXCEPTION:解析出的字段數(shù)和定義字段數(shù)不同時提示異常。

    • PAD:按從左到右順序填充。

      • 解析出的字段數(shù)大于定義字段數(shù)時,按從左到右的順序,取定義字段數(shù)量的數(shù)據(jù)。

      • 解析出的字段數(shù)小于定義字段數(shù)時,在行尾用Null填充缺少的字段。

    說明

    SKIP、EXCEPTION和PAD為可選值。

    columnErrorDebug

    是否打開調(diào)試開關(guān)。

    Boolean

    false

    如果設(shè)置為true,則打印解析異常的Log。

    pullBatchSize

    每次拉取消息的最大數(shù)量。

    Int

    64

    僅實時計算引擎VVR 8.0.7及以上版本支持該參數(shù)。

  • 結(jié)果表獨有

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    producerGroup

    寫入的群組。

    String

    無。

    retryTimes

    寫入的重試次數(shù)。

    Int

    10

    無。

    sleepTimeMs

    重試間隔時間。

    Long

    5000

    無。

    partitionField

    指定字段名,將該字段作為分區(qū)列。

    String

    如果modepartition,則該參數(shù)必填。

    說明

    僅實時計算引擎VVR 8.0.5及以上版本支持該參數(shù)。

類型映射

Flink字段類型

云消息隊列RocketMQ字段類型

VARCHAR

STRING

代碼示例

  • 源表示例

    • CSV格式

      假設(shè)您的一條CSV格式消息記錄如下。

      1,name,male 
      2,name,female
      說明

      一條RocketMQ消息可以包括零條到多條數(shù)據(jù)記錄,記錄之間使用\n分隔。

      Flink作業(yè)中,聲明RocketMQ數(shù)據(jù)源表的DDL如下。

      • RocketMQ 5.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq5',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
      • RocketMQ 4.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '1000',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
    • 二進(jìn)制格式

      • RocketMQ 5.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq5',
          'endpoint' = '<yourEndpoint>',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;

      • RocketMQ 4.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq',
          'endpoint' = '<yourEndpoint>',
          'pullIntervalMs' = '500',
          'accessId' = '${secret_values.ak_id}',
          'accessKey' = '${secret_values.ak_secret}',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;
  • 結(jié)果表示例

    • 創(chuàng)建結(jié)果表

      • RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
        說明

        如果您的MQ消息為二進(jìn)制格式,則DDL中只能定義一個字段,且字段類型必須為VARBINARY。

    • 創(chuàng)建將keystags字段指定為RocketMQ消息的key和tag的結(jié)果表

      • RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );

DataStream API

重要

通過DataStream的方式讀寫數(shù)據(jù)時,則需要使用對應(yīng)的DataStream連接器連接Flink全托管,DataStream連接器設(shè)置方法請參見DataStream連接器使用方法

實時計算引擎VVR提供MetaQSource,用于讀取RocketMQ;提供OutputFormat的實現(xiàn)類MetaQOutputFormat,用于寫入RocketMQ。讀取RocketMQ和寫入RocketMQ的示例如下:

RocketMQ 4.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下兩個配置僅本地調(diào)試時使用,需要在作業(yè)打包上傳到阿里云實時計算Flink版之前刪除
        conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

RocketMQ 5.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下兩個配置僅本地調(diào)試時使用,需要在作業(yè)打包上傳到阿里云實時計算Flink版之前刪除
        conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

XML

Maven中央庫中已經(jīng)放置了MQ DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
說明

RocketMQ接入點Endpoint配置詳情請參見關(guān)于TCP內(nèi)網(wǎng)接入點設(shè)置的公告

常見問題

RocketMQ Topic擴(kuò)容時,RocketMQ如何感知Topic分區(qū)數(shù)變化?