本文為您介紹如何將Spark中的數據導入至ClickHouse集群。
前提條件
已創建Hadoop集群,詳情請參見創建集群。
已創建ClickHouse集群,詳情請參見創建ClickHouse集群。
背景信息
關于Spark的更多介紹,請參見概述。
代碼示例
代碼示例如下。
package com.company.packageName
import java.util.Properties
import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
import com.google.common.collect.ImmutableMap
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SaveMode, SparkSession}
case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
object CKDataImporter extends Logging {
private var dbName: String = "default"
private var tableName: String = ""
private var ckHost: String = ""
private var ckPort: String = "8123"
private var user: String = "default"
private var password: String = ""
private var local: Boolean = false
def main(args: Array[String]): Unit = {
parse(args.toList)
checkArguments()
val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
logInfo(s"Use jdbc: $jdbcUrl")
logInfo(s"Use table: $tableName")
val spark = getSparkSession
// generate test data
val rdd = spark.sparkContext.parallelize(1 to 1000).map(i => {
val rand = ThreadLocalRandom.current()
val randString = (0 until rand.nextInt(10, 20))
.map(_ => rand.nextLong())
.mkString("")
Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
})
val df = spark.createDataFrame(rdd)
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, tableName, getCKJdbcProperties(user, password))
}
private def printUsageAndExit(exitCode: Int = 0): Unit = {
logError("Usage: java -jar /path/to/CKDataImporter.jar [options]")
logError(" --dbName 設置ClickHouse數據庫的名稱,默認為default")
logError(" --tableName 設置ClickHouse庫中表的名稱")
logError(" --ckHost 設置ClickHouse地址")
logError(" --ckPort 設置ClickHouse端口,默認為8123")
logError(" --user 設置ClickHouse所使用的用戶名")
logError(" --password 設置ClickHouse用戶的密碼")
logError(" --local 設置此程序使用Spark Local模式運行")
System.exit(exitCode)
}
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--help" | "-h") :: _ =>
printUsageAndExit()
case "--dbName" :: value :: tail =>
dbName = value
parse(tail)
case "--tableName" :: value :: tail =>
tableName = value
parse(tail)
case "--ckHost" :: value :: tail =>
ckHost = value
parse(tail)
case "--ckPort" :: value :: tail =>
ckPort = value
parse(tail)
case "--user" :: value :: tail =>
user = value
parse(tail)
case "--password" :: value :: tail =>
password = value
parse(tail)
case "--local" :: tail =>
local = true
parse(tail)
case Nil =>
case _ =>
printUsageAndExit(1)
}
private def checkArguments(): Unit = {
if ("".equals(tableName) || "".equals(ckHost)) {
printUsageAndExit(2)
}
}
private def getCKJdbcProperties(
user: String,
password: String,
batchSize: String = "1000",
socketTimeout: String = "300000",
numPartitions: String = "8",
rewriteBatchedStatements: String = "true"): Properties = {
val kvMap = ImmutableMap.builder()
.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
.put("user", user)
.put("password", password)
.put("batchsize", batchSize)
.put("socket_timeout", socketTimeout)
.put("numPartitions", numPartitions)
.put("rewriteBatchedStatements", rewriteBatchedStatements)
.build()
val properties = new Properties
properties.putAll(kvMap)
properties
}
private def getSparkSession: SparkSession = {
val builder = SparkSession.builder()
if (local) {
builder.master("local[*]")
}
builder.appName("ClickHouse-Data-Importer")
builder.getOrCreate()
}
}
操作流程
步驟一:創建ClickHouse表
使用SSH方式登錄ClickHouse集群,詳情請參見登錄集群。
執行如下命令,啟動ClickHouse客戶端。
clickhouse-client -h core-1-1 -m
說明 本示例登錄core-1-1節點,如果您有多個Core節點,可以登錄任意一個節點。創建ClickHouse信息。
執行如下命令,創建數據庫clickhouse_database_name。
CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;
阿里云EMR會為ClickHouse集群自動生成一個名為cluster_emr的集群。數據庫名您可以自定義。
執行如下命令,創建表clickhouse_table_name_local。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}') ORDER BY id;
說明表名您可以自定義,但請確保表名是以_local結尾。layer、shard和replica是阿里云EMR為ClickHouse集群自動生成的宏定義,可以直接使用。
執行如下命令,創建與表clickhouse_table_name_local字段定義一致的表clickhouse_table_name_all。
說明表名您可以自定義,但請確保表名是以_all結尾。
CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr ( id UInt32, key1 String, value1 UInt8, key2 Int64, value2 Float64 ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());
步驟二:編譯并打包
下載并解壓CKDataImporter示例到本地。
在CMD命令行中,進入到下載文件中pom.xml所在的目錄下,執行如下命令打包文件。
mvn clean package
根據您pom.xml文件中artifactId的信息,下載文件中的target目錄下會出現CKDataImporter-1.0.0.jar的JAR包。
步驟三:提交作業
使用SSH方式登錄Hadoop集群,詳情請參見登錄集群。
上傳打包好的CKDataImporter-1.0.0.jar至Hadoop集群的根目錄下。
說明本文示例中CKDataImporter-1.0.0.jar是上傳至root根目錄下,您也可以自定義上傳路徑。
執行如下命令提交作業。
spark-submit --master yarn \ --class com.aliyun.emr.CKDataImporter \ CKDataImporter-1.0.0.jar \ --dbName clickhouse_database_name \ --tableName clickhouse_table_name_all \ --ckHost ${clickhouse_host} \ --password ${password};
參數
說明
dbName
ClickHouse集群數據庫的名稱,默認為default。本文示例為clickhouse_database_name。
tableName
ClickHouse集群數據庫中表的名稱。本文示例為clickhouse_table_name_all。
ckHost
ClickHouse集群的Master節點的內網IP地址或公網IP地址。IP地址獲取方式,請參見獲取主節點的IP地址。
password
ClickHouse用戶的密碼。
您可以在ClickHouse服務的配置頁面,查看users.default.password參數,參數值即為ClickHouse用戶的密碼。
獲取主節點的IP地址
進入節點管理頁面。
在頂部菜單欄處,根據實際情況選擇地域和資源組。
在EMR on ECS頁面,單擊目標集群操作列的節點管理。
在節點管理頁面,單擊Master節點組所在行的圖標,復制公網IP列的IP地址。