云消息隊列 RocketMQ 版
本文為您介紹云消息隊列 RocketMQ 版連接器。
鑒于云消息隊列 RocketMQ 版 4.x標(biāo)準(zhǔn)版實例共享API調(diào)用彈性上限為每秒5000次,使用該版本的消息中間件在與實時計算Flink版對接時,若超過上限會觸發(fā)限流機(jī)制,可能會導(dǎo)致Flink作業(yè)運行不穩(wěn)定。因此,在選擇消息中間件時,如果您正在或計劃通過標(biāo)準(zhǔn)版RocketMQ與Flink對接,請您謹(jǐn)慎評估。如果業(yè)務(wù)場景允許,請考慮使用Kafka、日志服務(wù)(SLS)或DataHub等其他中間件進(jìn)行替代。如果您確實需要使用云消息隊列 RocketMQ 版 4.x標(biāo)準(zhǔn)版處理大規(guī)模的消息,也請同時通過提交工單與RocketMQ產(chǎn)品取得聯(lián)系申請?zhí)嵘匏偕舷蕖?/p>
背景信息
云消息隊列 RocketMQ 版是阿里云基于Apache RocketMQ構(gòu)建的低延遲、高并發(fā)、高可用和高可靠的分布式消息中間件。其既可為分布式應(yīng)用系統(tǒng)提供異步解耦和削峰填谷的能力,同時也具備互聯(lián)網(wǎng)應(yīng)用所需的海量消息堆積、高吞吐和可靠重試等特性。
RocketMQ連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表和結(jié)果表 |
運行模式 | 僅支持流模式 |
數(shù)據(jù)格式 | CSV和二進(jìn)制格式 |
特有監(jiān)控指標(biāo) |
說明 指標(biāo)含義詳情,請參見監(jiān)控指標(biāo)說明。 |
API種類 | Datastream(僅支持RocketMQ 4.x)和SQL |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 不支持刪除結(jié)果表數(shù)據(jù),只支持插入和更新數(shù)據(jù)。 |
特色功能
RocketMQ源表和結(jié)果表支持屬性字段,具體如下。
源表屬性字段
說明僅在VVR 3.0.1及以上版本支持獲取以下RocketMQ屬性字段。
字段名
字段類型
說明
topic
VARCHAR METADATA VIRTUAL
消息Topic。
queue-id
INT METADATA VIRTUAL
消息隊列ID。
queue-offset
BIGINT METADATA VIRTUAL
消息隊列的消費位點。
msg-id
VARCHAR METADATA VIRTUAL
消息ID。
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
消息存儲時間。
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
消息生成時間。
keys
VARCHAR METADATA VIRTUAL
消息Keys。
tags
VARCHAR METADATA VIRTUAL
消息Tags。
結(jié)果表屬性字段
說明僅實時計算引擎VVR 4.0.0及以上版本支持以下RocketMQ屬性字段。
字段名
字段類型
說明
keys
VARCHAR METADATA
消息Keys。
tags
VARCHAR METADATA
消息Tags。
前提條件
已創(chuàng)建了RocketMQ資源,詳情請參見創(chuàng)建資源。
使用限制
僅Flink實時計算引擎VVR 2.0.0及以上版本支持RocketMQ連接器。
僅Flink實時計算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。
在Flink實時計算引擎VVR 6.0.2以下版本,源表的并發(fā)度必須小于等于RocketMQ topic的分區(qū)數(shù),在實時計算引擎VVR 6.0.2及以上版本解除該限制。您可以提前設(shè)置大于分區(qū)數(shù)的并發(fā)度,不需要因RocketMQ的縮容而手動調(diào)整作業(yè)并發(fā)度。
RocketMQ連接器使用Pull Consumer消費,所有的子任務(wù)分擔(dān)消費。
語法結(jié)構(gòu)
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);
WITH參數(shù)
通用
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
connector
connector類型。
String
是
無
RocketMQ 4.x固定值為
mq
。RocketMQ 5.x固定值為
mq5
。
endPoint
EndPoint地址
String
是
無
云消息隊列 RocketMQ 版接入地址支持以下兩種類型:
VVR 3.0.1及以上版本的作業(yè):需要使用TCP協(xié)議客戶端接入點,詳情請參見
內(nèi)網(wǎng)服務(wù)MQ(阿里云經(jīng)典網(wǎng)絡(luò)/VPC)接入地址:在MQ控制臺目標(biāo)實例詳情中,選擇
,獲取對應(yīng)的EndPoint。公網(wǎng)服務(wù)MQ接入地址:在MQ控制臺目標(biāo)實例詳情中,選擇
,獲取對應(yīng)的EndPoint。
重要由于阿里云網(wǎng)絡(luò)安全策略動態(tài)變化,實時計算連接公網(wǎng)服務(wù)MQ時可能會出現(xiàn)網(wǎng)絡(luò)連接問題,推薦您使用內(nèi)網(wǎng)服務(wù)MQ。
內(nèi)網(wǎng)服務(wù)無法跨域訪問。例如,您所購買的實時計算服務(wù)的地域為華東1,但是購買的RocketMQ服務(wù)的地域為華東2(上海),則無法訪問。
通過公網(wǎng)方式訪問RocketMQ,需要配置NAT,詳情請參見創(chuàng)建和管理公網(wǎng)NAT網(wǎng)關(guān)實例。
VVR 3.0.1以下版本的作業(yè):RocketMQ舊的接入點已不可用,您需要適配升級實時計算作業(yè)。
重要如果您已使用了VVR 3.0.1以下版本的RocketMQ連接器,則您需要將您的實時計算作業(yè)升級至VVR 3.0.1及以上版本,并將作業(yè)中EndPoint參數(shù)取值更改為新的RocketMQ接入點,舊的接入點存在穩(wěn)定性風(fēng)險或不可用的問題,詳情請參見實時計算Flink版產(chǎn)品公告
topic
topic名稱。
String
是
無
無。
accessId
4.x:阿里云賬號的AccessKey ID。
5.x:
RocketMQ實例用戶名
String
RocketMQ 4.x:是
RocketMQ 5.x:否
無
RocketMQ 4.x:詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理。
RocketMQ 5.x:如果是使用公網(wǎng)接入點訪問,需配置為RocketMQ控制臺實例用戶名。如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置。
accessKey
4.x: 阿里云賬號的AccessKey Secret。
5.x:實例密碼
String
RocketMQ 4.x:是
RocketMQ 5.x:否
無
RocketMQ 4.x:詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理。
RocketMQ 5.x:如果是使用公網(wǎng)接入點訪問,需配置為RocketMQ控制臺實例密碼。如果是在阿里云ECS內(nèi)網(wǎng)訪問,無需填寫該配置。
tag
訂閱或?qū)懭氲臉?biāo)簽
String
否
無
RocketMQ作為源表時,只能讀取單個tag。
RocketMQ作為結(jié)果表時,支持設(shè)置多個tag,以逗號(,)進(jìn)行分隔。
說明當(dāng)作為結(jié)果表時,僅支持RocketMQ 4.x。RocketMQ 5.x請使用結(jié)果表屬性字段來指定寫出消息的 tag。
nameServerSubgroup
NameServer組。
String
否
無
內(nèi)網(wǎng)服務(wù)(阿里云經(jīng)典網(wǎng)絡(luò)或VPC):必須配置
'nameServerSubgroup' = 'nsaddr4client-internal'
。公網(wǎng)服務(wù):無需配置
nameServerSubgroup
。
說明僅VVR 2.1.1-VVR 3.0.0版本支持該參數(shù),VVR 3.0.1及以后版本不支持該參數(shù)。
encoding
編碼格式。
String
否
UTF-8
無。
instanceID
RocketMQ實例ID。
String
否
無
如果RocketMQ實例無獨立命名空間,則不可以使用instanceID參數(shù)。
如果RocketMQ實例有獨立命名空間,則instanceID參數(shù)必選。
說明僅RocketMQ 4.x支持該參數(shù),RocketMQ 5.x不需要配置該參數(shù)。
源表獨有
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
consumerGroup
Consumer組名。
String
是
無
無。
pullIntervalMs
上游沒有數(shù)據(jù)可供消費時,source的休眠時間。
Int
是
無
單位為毫秒。
目前沒有限流機(jī)制,無法設(shè)置讀取RocketMQ的速率。
說明僅RocketMQ 4.x支持該參數(shù),RocketMQ 5.x不需要配置該參數(shù)。
timeZone
時區(qū)。
String
否
無
例如,Asia/Shanghai。
startTimeMs
啟動時間點。
Long
否
無
時間戳,單位為毫秒。
startMessageOffset
消息開始的偏移量。
Int
否
無
如果填寫該參數(shù),則優(yōu)先以
startMessageOffset
的位點開始加載數(shù)據(jù)。lineDelimiter
解析Block時,行分隔符。
String
否
\n
無。
fieldDelimiter
字段分隔符。
String
否
\u0001
根據(jù)MQ終端的模式,分隔符分別為:
在只讀模式下(默認(rèn)模式),分隔符為
\u0001
。該模式下,分隔符不可見。在編輯模式下,分隔符為
^A
。
lengthCheck
單行字段條數(shù)檢查策略。
Int
否
NONE
取值如下:
NONE:默認(rèn)值。
解析出的字段數(shù)大于定義字段數(shù)時,按從左到右的順序,取定義字段數(shù)量的數(shù)據(jù)。
解析出的字段數(shù)小于定義字段數(shù)時,跳過這行數(shù)據(jù)。
SKIP:解析出的字段數(shù)和定義字段數(shù)不同時跳過數(shù)據(jù)。
EXCEPTION:解析出的字段數(shù)和定義字段數(shù)不同時提示異常。
PAD:按從左到右順序填充。
解析出的字段數(shù)大于定義字段數(shù)時,按從左到右的順序,取定義字段數(shù)量的數(shù)據(jù)。
解析出的字段數(shù)小于定義字段數(shù)時,在行尾用Null填充缺少的字段。
說明SKIP、EXCEPTION和PAD為可選值。
columnErrorDebug
是否打開調(diào)試開關(guān)。
Boolean
否
false
如果設(shè)置為true,則打印解析異常的Log。
pullBatchSize
每次拉取消息的最大數(shù)量。
Int
否
64
僅實時計算引擎VVR 8.0.7及以上版本支持該參數(shù)。
結(jié)果表獨有
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
producerGroup
寫入的群組。
String
是
無
無。
retryTimes
寫入的重試次數(shù)。
Int
否
10
無。
sleepTimeMs
重試間隔時間。
Long
否
5000
無。
partitionField
指定字段名,將該字段作為分區(qū)列。
String
否
無
如果
mode
為partition
,則該參數(shù)必填。說明僅實時計算引擎VVR 8.0.5及以上版本支持該參數(shù)。
類型映射
Flink字段類型 | 云消息隊列RocketMQ字段類型 |
VARCHAR | STRING |
代碼示例
源表示例
CSV格式
假設(shè)您的一條CSV格式消息記錄如下。
1,name,male 2,name,female
說明一條RocketMQ消息可以包括零條到多條數(shù)據(jù)記錄,記錄之間使用
\n
分隔。Flink作業(yè)中,聲明RocketMQ數(shù)據(jù)源表的DDL如下。
RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
二進(jìn)制格式
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
結(jié)果表示例
創(chuàng)建結(jié)果表
RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
說明如果您的MQ消息為二進(jìn)制格式,則DDL中只能定義一個字段,且字段類型必須為VARBINARY。
創(chuàng)建將
keys
和tags
字段指定為RocketMQ消息的key和tag的結(jié)果表RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
DataStream API
通過DataStream的方式讀寫數(shù)據(jù)時,則需要使用對應(yīng)的DataStream連接器連接Flink全托管,DataStream連接器設(shè)置方法請參見DataStream連接器使用方法。
實時計算引擎VVR提供MetaQSource
,用于讀取RocketMQ;提供OutputFormat
的實現(xiàn)類MetaQOutputFormat
,用于寫入RocketMQ。讀取RocketMQ和寫入RocketMQ的示例如下:
RocketMQ 4.x
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// 以下兩個配置僅本地調(diào)試時使用,需要在作業(yè)打包上傳到阿里云實時計算Flink版之前刪除
conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// Creates and adds RocketMQ source.
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// Converts message body to upper case.
.map(RocketMQDataStreamDemo2::convertMessages)
// Creates and adds RocketMQ sink.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// Compiles and submits job.
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // always null
null, // tag of the messages to consumer
Long.MAX_VALUE, // stop timestamp in milliseconds
-1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
0, // Start offset.
300_000, // Partition discover interval.
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}
RocketMQ 5.x
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// 以下兩個配置僅本地調(diào)試時使用,需要在作業(yè)打包上傳到阿里云實時計算Flink版之前刪除
conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}
XML
Maven中央庫中已經(jīng)放置了MQ DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>
RocketMQ接入點Endpoint配置詳情請參見關(guān)于TCP內(nèi)網(wǎng)接入點設(shè)置的公告。