Lindorm流引擎完全兼容開源Kafka API,您可以通過Kafka API編寫程序寫入Lindorm流引擎數據,也可以通過開源的三方工具采集并寫入Lindorm流引擎數據,例如FluentD、Debezium等。本文介紹通過開源Kafka客戶端連接Lindorm流引擎并寫入Lindorm流引擎數據的代碼示例。
前提條件
操作步驟
下載開源Kafka客戶端。在pom.xml中配置Maven依賴,具體內容如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency>
連接Lindorm流引擎并寫入Lindorm流引擎數據。完整的代碼示例如下:
說明寫入的數據格式支持JSON、Avro和CSV。
代碼中Lindorm Stream Kafka地址為專有網絡地址,獲取方法請參見查看連接地址。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.codehaus.jettison.json.JSONObject; import java.util.Properties; import java.util.concurrent.Future; public class KafkaToLindormStreamDemo { public static void main(String[] args) { Properties props = new Properties(); //設置Lindorm Stream Kafka地址,這個Lindorm Stream Kafka地址為專有網絡地址,需確保應用程序部署的環境和Lindorm實例使用相同的專有網絡ID。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Lindorm Stream Kafka地址"); //指定數據流表的物理數據存儲在某個Topic上 String topic = "log_topic"; props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); try { JSONObject json = new JSONObject(); //寫入流引擎數據 json.put("timestamp", System.currentTimeMillis()); json.put("loglevel", "ERROR"); json.put("thread", "[ReportFinishedTask7-thread-4]"); json.put("class", "engine.ImporterTaskManager(318)"); json.put("detail", "Remove tasks fail: job name=e35318e5-52ea-48ab-ad2a-0144ffc6955e , task name=prepare_e35318e5-52ea-48ab-ad2a-0144ffc6955e , runningTasks=0"); Future<RecordMetadata> future = producer.send( new ProducerRecord<String, String>(topic, json.getString("thread") + json.getLong("timestamp"), json.toString())); producer.flush(); try { RecordMetadata recordMetadata = future.get(); System.out.println("Produce ok:" + recordMetadata.toString()); } catch (Throwable t) { System.out.println("Produce exception " + t.getMessage()); t.printStackTrace(); } } catch (Exception e) { System.out.println("Produce exception " + e.getMessage()); e.printStackTrace(); } } }
文檔內容是否對您有幫助?