日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過開源Kafka客戶端寫入Lindorm流引擎數據

Lindorm流引擎完全兼容開源Kafka API,您可以通過Kafka API編寫程序寫入Lindorm流引擎數據,也可以通過開源的三方工具采集并寫入Lindorm流引擎數據,例如FluentD、Debezium等。本文介紹通過開源Kafka客戶端連接Lindorm流引擎并寫入Lindorm流引擎數據的代碼示例。

前提條件

  • 已安裝Java環境,要求安裝JDK 1.7及以上版本。

  • 已將客戶端IP地址添加至Lindorm實例的白名單中,具體操作請參見設置白名單

  • 已獲取Lindorm流引擎的Lindorm Stream Kafka地址,具體操作請參見查看連接地址

    說明

    Lindorm流引擎的Lindorm Stream Kafka地址為專有網絡地址,需確保應用程序部署的環境和Lindorm實例使用相同的專有網絡ID。

操作步驟

  1. 下載開源Kafka客戶端。在pom.xml中配置Maven依賴,具體內容如下:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
  2. 連接Lindorm流引擎并寫入Lindorm流引擎數據。完整的代碼示例如下:

    說明
    • 寫入的數據格式支持JSON、Avro和CSV。

    • 代碼中Lindorm Stream Kafka地址為專有網絡地址,獲取方法請參見查看連接地址

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.codehaus.jettison.json.JSONObject;
    
    import java.util.Properties;
    import java.util.concurrent.Future;
    
    public class KafkaToLindormStreamDemo {
    
        public static void main(String[] args) {
            Properties props = new Properties();
    
            //設置Lindorm Stream Kafka地址,這個Lindorm Stream Kafka地址為專有網絡地址,需確保應用程序部署的環境和Lindorm實例使用相同的專有網絡ID。
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Lindorm Stream Kafka地址");
           //指定數據流表的物理數據存儲在某個Topic上
            String topic = "log_topic";
    
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
            try {
                JSONObject json = new JSONObject();
                //寫入流引擎數據
                json.put("timestamp", System.currentTimeMillis());
                json.put("loglevel", "ERROR");
                json.put("thread", "[ReportFinishedTask7-thread-4]");
                json.put("class", "engine.ImporterTaskManager(318)");
                json.put("detail", "Remove tasks fail: job name=e35318e5-52ea-48ab-ad2a-0144ffc6955e , task name=prepare_e35318e5-52ea-48ab-ad2a-0144ffc6955e , runningTasks=0");
                Future<RecordMetadata> future = producer.send(
              new ProducerRecord<String, String>(topic, json.getString("thread") + json.getLong("timestamp"),
                  json.toString()));
                producer.flush();
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("Produce ok:" + recordMetadata.toString());
                } catch (Throwable t) {
                    System.out.println("Produce exception " + t.getMessage());
                    t.printStackTrace();
                }
            } catch (Exception e) {
                System.out.println("Produce exception " + e.getMessage());
                e.printStackTrace();
            }
        }
    }