本文介紹如何使用Python SDK通過接入點接入云消息隊列 Kafka 版并收發消息。
環境準備
添加Python依賴庫
執行以下命令安裝依賴庫。
pip install confluent-kafka==1.9.2
建議您安裝confluent-kafka 1.9.2及以下版本的依賴庫,否則使用公網發送消息會報SSL_HANDSHAKE
錯誤。
準備配置
可選:下載SSL根證書。如果是SSL接入點,需下載該證書。
- 訪問aliware-kafka-demos,單擊圖標,然后在下拉框選擇Download ZIP,下載Demo包并解壓。
在解壓的Demo工程中,找到kafka-confluent-python-demo文件夾,將此文件夾上傳到Linux系統。
修改配置文件setting.py。
默認接入點
登錄Linux系統,進入vpc文件目錄,修改配置文件setting.py。
kafka_setting = { 'bootstrap_servers': 'XXX:xxx,XXX:xxx', 'topic_name': 'XXX', 'group_name': 'XXX' }
參數
描述
bootstrap_servers
默認接入點。您可在云消息隊列 Kafka 版控制臺的實例詳情頁面的接入點信息區域獲取。
topic_name
Topic名稱。您可在云消息隊列 Kafka 版控制臺的Topic 管理頁面獲取。
group_name
Group名稱。您可在云消息隊列 Kafka 版控制臺的Group 管理頁面獲取。
SSL接入點
登錄Linux系統,進入vpc-ssl文件目錄,修改配置文件setting.py。
kafka_setting = { 'sasl_plain_username': 'XXX', 'sasl_plain_password': 'XXX', 'ca_location': '/XXX/mix-4096-ca-cert', 'bootstrap_servers': 'XXX:xxx,XXX:xxx', 'topic_name': 'XXX', 'group_name': 'XXX' }
參數
描述
sasl_plain_username
SASL用戶名。
說明- 如果實例未開啟ACL,您可以在云消息隊列 Kafka 版控制臺的實例詳情頁面的配置信息區域獲取默認的用戶名和密碼。
- 如果實例已開啟ACL,請確保要使用的SASL用戶已被授予向云消息隊列 Kafka 版實例收發消息的權限。具體操作,請參見SASL用戶授權。
sasl_plain_password
SASL用戶名密碼。
ca_location
SSL根證書的路徑。用本地路徑替換示例中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert。
bootstrap_servers
SSL接入點。您可在云消息隊列 Kafka 版控制臺的實例詳情頁面的接入點信息區域獲取。
topic_name
Topic名稱。您可在云消息隊列 Kafka 版控制臺的Topic 管理頁面獲取。
group_name
Group名稱。您可在云消息隊列 Kafka 版控制臺的Group 管理頁面獲取。
發送消息
執行以下命令發送消息(當前示例的Python版本為3.9)。
python kafka_producer.py
消息程序kafka_producer.py示例代碼如下:
默認接入點
from confluent_kafka import Producer import setting conf = setting.kafka_setting # 初始化一個Producer對象。 p = Producer({'bootstrap.servers': conf['bootstrap_servers']}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) # 異步發送消息。 p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) # 在程序結束時,調用flush。 p.flush()
SSL接入點
from confluent_kafka import Producer import setting conf = setting.kafka_setting p = Producer({'bootstrap.servers':conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'ssl.endpoint.identification.algorithm':'none', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password']}) def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) p.flush()
訂閱消息
執行以下命令訂閱消息(當前示例的Python版本為3.9)。
python kafka_consumer.py
消息程序kafka_consumer.py示例代碼如下:
默認接入點
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'group.id': conf['group_name'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()
SSL接入點
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password'], 'ssl.endpoint.identification.algorithm':'none', 'group.id': conf['group_name'], 'auto.offset.reset': 'latest', 'fetch.message.max.bytes':'1024*512' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()