本文為您介紹如何使用Flume同步EMR DataFlow集群的數據至EMR DataLake集群的Hive。
操作步驟
- 通過SSH方式連接DataLake集群,詳情請參見登錄集群。
- 創建Hive表。Flume使用事務操作將數據寫入Hive,需要在創建Hive表(flume_test)時設置transactional屬性。
create table flume_test (id int, content string) clustered by (id) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');
Hive的基礎操作,請參見Hive基礎操作。
- 配置Flume。
- 進入Flume的配置頁面。
- 登錄EMR on ECS控制臺。
- 在頂部菜單欄處,根據實際情況選擇地域和資源組。
- 在集群管理頁面,單擊目標集群操作列的集群服務。
- 在集群服務頁面,單擊FLUME服務區域的配置。
- 單擊flume-conf.properties頁簽。本文示例采用的是全局配置方式,如果您想按照節點配置,可以在FLUME服務配置頁面的下拉列表中選擇獨立節點配置。
- 在flume-conf.properties的參數值中,添加以下內容。
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group # Describe the sink default-agent.sinks.k1.type = hive default-agent.sinks.k1.hive.metastore = thrift://xxxx:9083 default-agent.sinks.k1.hive.database = default default-agent.sinks.k1.hive.table = flume_test default-agent.sinks.k1.serializer = DELIMITED default-agent.sinks.k1.serializer.delimiter = "," default-agent.sinks.k1.serializer.serdeSeparator = ',' default-agent.sinks.k1.serializer.fieldnames =id,content default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
參數 描述 default-agent.sources.source1.kafka.bootstrap.servers Kafka集群Broker的Host和端口號。 default-agent.channels.c1.capacity 通道中存儲的最大事件數。請根據實際環境修改該參數值。 default-agent.channels.c1.transactionCapacity 每個事務通道將從源接收或提供給接收器的最大事件數。請根據實際環境修改該參數值。 default-agent.sinks.k1.hive.metastore Hive metastore的URI,格式為thrift://emr-header-1.cluster-xxx:9083。其中emr-header-1.cluster-xxx您可以通過 hostname
獲取。 - 保存配置。
- 單擊下方的保存。
- 在彈出的對話框中,輸入執行原因,單擊確定。
- 進入Flume的配置頁面。
- 啟動服務。
- 在FLUME服務頁面,選擇更多操作 > 重啟。
- 在彈出的對話框中,輸入執行原因,單擊確定。
- 在確認對話框中,單擊確定。
- 測試數據同步情況。
- 通過SSH方式連接DataFlow集群,詳情請參見登錄集群。
- 創建名稱為flume-test的Topic。
kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
- 生成測試數據。
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092
例如輸入
abc
并回車。 - 通過SSH方式連接DataLake集群,在客戶端配置Hive參數并查詢表中的數據。
set hive.support.concurrency=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
配置好后查詢flume_test表中的數據。select * from flume_test;
返回信息如下:OK 1 a