日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Python SDK收發消息

本文介紹如何使用Python SDK通過接入點接入云消息隊列 Kafka 版并收發消息。

環境準備

添加Python依賴庫

執行以下命令安裝依賴庫。

pip install confluent-kafka==1.9.2
重要

建議您安裝confluent-kafka 1.9.2及以下版本的依賴庫,否則使用公網發送消息會報SSL_HANDSHAKE錯誤。

準備配置

  1. 可選:下載SSL根證書。如果是SSL接入點,需下載該證書。

  2. 訪問aliware-kafka-demos,單擊code圖標,然后在下拉框選擇Download ZIP,下載Demo包并解壓。
  3. 在解壓的Demo工程中,找到kafka-confluent-python-demo文件夾,將此文件夾上傳到Linux系統。

  4. 修改配置文件setting.py。

    默認接入點

    登錄Linux系統,進入vpc文件目錄,修改配置文件setting.py。

    kafka_setting = {
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    參數

    描述

    bootstrap_servers

    默認接入點。您可在云消息隊列 Kafka 版控制臺實例詳情頁面的接入點信息區域獲取。

    topic_name

    Topic名稱。您可在云消息隊列 Kafka 版控制臺Topic 管理頁面獲取。

    group_name

    Group名稱。您可在云消息隊列 Kafka 版控制臺Group 管理頁面獲取。

    SSL接入點

    登錄Linux系統,進入vpc-ssl文件目錄,修改配置文件setting.py。

    kafka_setting = {
        'sasl_plain_username': 'XXX',
        'sasl_plain_password': 'XXX',
        'ca_location': '/XXX/mix-4096-ca-cert',
        'bootstrap_servers': 'XXX:xxx,XXX:xxx',
        'topic_name': 'XXX',
        'group_name': 'XXX'
    }
    

    參數

    描述

    sasl_plain_username

    SASL用戶名。

    說明
    • 如果實例未開啟ACL,您可以在云消息隊列 Kafka 版控制臺實例詳情頁面的配置信息區域獲取默認的用戶名密碼
    • 如果實例已開啟ACL,請確保要使用的SASL用戶已被授予向云消息隊列 Kafka 版實例收發消息的權限。具體操作,請參見SASL用戶授權

    sasl_plain_password

    SASL用戶名密碼。

    ca_location

    SSL根證書的路徑。用本地路徑替換示例中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert

    bootstrap_servers

    SSL接入點。您可在云消息隊列 Kafka 版控制臺實例詳情頁面的接入點信息區域獲取。

    topic_name

    Topic名稱。您可在云消息隊列 Kafka 版控制臺Topic 管理頁面獲取。

    group_name

    Group名稱。您可在云消息隊列 Kafka 版控制臺Group 管理頁面獲取。

發送消息

執行以下命令發送消息(當前示例的Python版本為3.9)。

python kafka_producer.py

消息程序kafka_producer.py示例代碼如下:

  • 默認接入點

    from confluent_kafka import Producer
    import setting
    
    conf = setting.kafka_setting
    # 初始化一個Producer對象。
    p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
    
    def delivery_report(err, msg):
        """ Called once for each message produced to indicate delivery result.
            Triggered by poll() or flush(). """
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    # 異步發送消息。
    p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
    p.poll(0)
    
    # 在程序結束時,調用flush。
    p.flush()
  • SSL接入點

    from confluent_kafka import Producer
    import setting
    
    conf = setting.kafka_setting
    
    p = Producer({'bootstrap.servers':conf['bootstrap_servers'],
       'ssl.endpoint.identification.algorithm': 'none',
       'sasl.mechanisms':'PLAIN',
       'ssl.ca.location':conf['ca_location'],
       'security.protocol':'SASL_SSL',
       'ssl.endpoint.identification.algorithm':'none',
       'sasl.username':conf['sasl_plain_username'],
       'sasl.password':conf['sasl_plain_password']})
    
    
    def delivery_report(err, msg):
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    
    p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
    p.poll(0)
    
    p.flush()

訂閱消息

執行以下命令訂閱消息(當前示例的Python版本為3.9)。

python kafka_consumer.py

消息程序kafka_consumer.py示例代碼如下:

  • 默認接入點

    from confluent_kafka import Consumer, KafkaError
    
    import setting
    
    conf = setting.kafka_setting
    
    c = Consumer({
        'bootstrap.servers': conf['bootstrap_servers'],
        'group.id': conf['group_name'],
        'auto.offset.reset': 'latest'
    })
    
    c.subscribe([conf['topic_name']])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print("Consumer error: {}".format(msg.error()))
                continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()
  • SSL接入點

    from confluent_kafka import Consumer, KafkaError
    
    import setting
    
    conf = setting.kafka_setting
    
    c = Consumer({
        'bootstrap.servers': conf['bootstrap_servers'],
        'ssl.endpoint.identification.algorithm': 'none',
        'sasl.mechanisms':'PLAIN',
        'ssl.ca.location':conf['ca_location'],
        'security.protocol':'SASL_SSL',
        'sasl.username':conf['sasl_plain_username'],
        'sasl.password':conf['sasl_plain_password'],
        'ssl.endpoint.identification.algorithm':'none',
        'group.id': conf['group_name'],
        'auto.offset.reset': 'latest',
        'fetch.message.max.bytes':'1024*512'
    })
    
    c.subscribe([conf['topic_name']])
    
    while True:
        msg = c.poll(1.0)
    
        if msg is None:
            continue
        if msg.error():
           if msg.error().code() == KafkaError._PARTITION_EOF:
              continue
           else:
               print("Consumer error: {}".format(msg.error()))
               continue
    
        print('Received message: {}'.format(msg.value().decode('utf-8')))
    
    c.close()