日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

從Flink導入數據至ClickHouse

本文為您介紹如何將Flink中的數據導入至ClickHouse集群。

前提條件

背景信息

關于Flink的更多介紹,請參見Apache Flink。

代碼示例

代碼示例如下:

  • 流處理

    package com.company.packageName
    
    import java.util.concurrent.ThreadLocalRandom
    
    import scala.annotation.tailrec
    
    import org.apache.flink.api.common.typeinfo.Types
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api.scala.{StreamTableEnvironment, table2RowDataStream}
    
    object StreamingJob {
    
      case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
    
      private var dbName: String = "default"
      private var tableName: String = ""
      private var ckHost: String = ""
      private var ckPort: String = "8123"
      private var user: String = "default"
      private var password: String = ""
    
      def main(args: Array[String]) {
        parse(args.toList)
        checkArguments()
    
        // set up the streaming execution environment
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tableEnv = StreamTableEnvironment.create(env)
    
        val insertIntoCkSql =
          s"""
            | INSERT INTO $tableName (
            |   id, key1, value1, key2, value2
            | ) VALUES (
            |   ?, ?, ?, ?, ?
            | )
            |""".stripMargin
    
        val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    
        println(s"jdbc url: $jdbcUrl")
        println(s"insert sql: $insertIntoCkSql")
    
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(jdbcUrl)
          .setUsername(user)
          .setPassword(password)
          .setQuery(insertIntoCkSql)
          .setBatchSize(1000)
          .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
          .build()
    
        val data: DataStream[Test] = env.fromCollection(1 to 1000).map(i => {
          val rand = ThreadLocalRandom.current()
          val randString = (0 until rand.nextInt(10, 20))
            .map(_ => rand.nextLong())
            .mkString("")
          Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
        })
    
        val table = table2RowDataStream(tableEnv.fromDataStream(data))
        sink.emitDataStream(table.javaStream)
    
        // execute program
        env.execute("Flink Streaming Scala API Skeleton")
      }
    
      private def printUsageAndExit(exitCode: Int = 0): Unit = {
        println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
        println("  --dbName      設置ClickHouse數據庫的名稱,默認為default")
        println("  --tableName   設置ClickHouse庫中表的名稱")
        println("  --ckHost      設置ClickHouse地址")
        println("  --ckPort      設置ClickHouse端口,默認為8123")
        println("  --user        設置ClickHouse所使用的用戶名")
        println("  --password    設置ClickHouse用戶的密碼")
        System.exit(exitCode)
      }
    
      @tailrec
      private def parse(args: List[String]): Unit = args match {
        case ("--help" | "-h") :: _ =>
          printUsageAndExit()
        case "--dbName" :: value :: tail =>
          dbName = value
          parse(tail)
        case "--tableName" :: value :: tail =>
          tableName = value
          parse(tail)
        case "--ckHost" :: value :: tail =>
          ckHost = value
          parse(tail)
        case "--ckPort" :: value :: tail =>
          ckPort = value
          parse(tail)
        case "--user" :: value :: tail =>
          user = value
          parse(tail)
        case "--password" :: value :: tail =>
          password = value
          parse(tail)
        case Nil =>
        case _ =>
          printUsageAndExit(1)
      }
    
      private def checkArguments(): Unit = {
        if ("".equals(tableName) || "".equals(ckHost)) {
          printUsageAndExit(2)
        }
      }
    }
  • 批處理

    package com.company.packageName
    
    import java.util.concurrent.ThreadLocalRandom
    
    import scala.annotation.tailrec
    
    import org.apache.flink.Utils
    import org.apache.flink.api.common.typeinfo.Types
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.scala._
    import org.apache.flink.table.api.scala.{BatchTableEnvironment, table2RowDataSet}
    
    object BatchJob {
    
      case class Test(id: Int, key1: String, value1: Boolean, key2: Long, value2: Double)
    
      private var dbName: String = "default"
      private var tableName: String = ""
      private var ckHost: String = ""
      private var ckPort: String = "8123"
      private var user: String = "default"
      private var password: String = ""
    
      def main(args: Array[String]) {
        parse(args.toList)
        checkArguments()
    
        // set up the batch execution environment
        val env = ExecutionEnvironment.getExecutionEnvironment
        val tableEnv = BatchTableEnvironment.create(env)
    
        val insertIntoCkSql =
          s"""
            | INSERT INTO $tableName (
            |   id, key1, value1, key2, value2
            | ) VALUES (
            |   ?, ?, ?, ?, ?
            | )
            |""".stripMargin
        val jdbcUrl = s"jdbc:clickhouse://$ckHost:$ckPort/$dbName"
    
        println(s"jdbc url: $jdbcUrl")
        println(s"insert sql: $insertIntoCkSql")
    
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(jdbcUrl)
          .setUsername(user)
          .setPassword(password)
          .setQuery(insertIntoCkSql)
          .setBatchSize(1000)
          .setParameterTypes(Types.INT, Types.STRING, Types.BOOLEAN, Types.LONG, Types.DOUBLE)
          .build()
    
        val data = env.fromCollection(1 to 1000).map(i => {
          val rand = ThreadLocalRandom.current()
          val randString = (0 until rand.nextInt(10, 20))
            .map(_ => rand.nextLong())
            .mkString("")
          Test(i, randString, rand.nextBoolean(), rand.nextLong(), rand.nextGaussian())
        })
    
        val table = table2RowDataSet(tableEnv.fromDataSet(data))
    
        sink.emitDataSet(Utils.convertScalaDatasetToJavaDataset(table))
    
        // execute program
        env.execute("Flink Batch Scala API Skeleton")
      }
    
      private def printUsageAndExit(exitCode: Int = 0): Unit = {
        println("Usage: flink run com.company.packageName.StreamingJob /path/to/flink-clickhouse-demo-1.0.0.jar [options]")
        println("  --dbName      設置ClickHouse數據庫的名稱,默認為default")
        println("  --tableName   設置ClickHouse庫中表的名稱")
        println("  --ckHost      設置ClickHouse地址")
        println("  --ckPort      設置ClickHouse端口,默認為8123")
        println("  --user        設置ClickHouse所使用的用戶名")
        println("  --password    設置ClickHouse用戶的密碼")
        System.exit(exitCode)
      }
    
      @tailrec
      private def parse(args: List[String]): Unit = args match {
        case ("--help" | "-h") :: _ =>
          printUsageAndExit()
        case "--dbName" :: value :: tail =>
          dbName = value
          parse(tail)
        case "--tableName" :: value :: tail =>
          tableName = value
          parse(tail)
        case "--ckHost" :: value :: tail =>
          ckHost = value
          parse(tail)
        case "--ckPort" :: value :: tail =>
          ckPort = value
          parse(tail)
        case "--user" :: value :: tail =>
          user = value
          parse(tail)
        case "--password" :: value :: tail =>
          password = value
          parse(tail)
        case Nil =>
        case _ =>
          printUsageAndExit(1)
      }
    
      private def checkArguments(): Unit = {
        if ("".equals(tableName) || "".equals(ckHost)) {
          printUsageAndExit(2)
        }
      }
    }

操作流程

  1. 步驟一:創建ClickHouse表

  2. 步驟二:編譯并打包

  3. 步驟三:提交作業

步驟一:創建ClickHouse表

  1. 使用SSH方式登錄ClickHouse集群,詳情請參見登錄集群。

  2. 執行如下命令,啟動ClickHouse客戶端。

    clickhouse-client -h core-1-1 -m
    說明

    本示例登錄core-1-1節點,如果您有多個Core節點,可以登錄任意一個節點。

  3. 創建ClickHouse信息。

    1. 執行如下命令,創建數據庫clickhouse_database_name

      CREATE DATABASE clickhouse_database_name ON CLUSTER cluster_emr;

      阿里云EMR會為ClickHouse集群自動生成一個名為cluster_emr的集群。數據庫名您可以自定義。

    2. 執行如下命令,創建表clickhouse_table_name_local。

      CREATE TABLE clickhouse_database_name.clickhouse_table_name_local ON CLUSTER cluster_emr (
        id            UInt32,
        key1            String,
        value1        UInt8,
        key2            Int64,
        value2        Float64
      ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/clickhouse_database_name/clickhouse_table_name_local', '{replica}')
      ORDER BY id;
      說明

      表名您可以自定義,但請確保表名是以_local結尾。layershardreplica是阿里云EMR為ClickHouse集群自動生成的宏定義,可以直接使用。

    3. 執行如下命令,創建與表clickhouse_table_name_local字段定義一致的表clickhouse_table_name_all。

      說明

      表名您可以自定義,但請確保表名是以_all結尾。

      CREATE TABLE clickhouse_database_name.clickhouse_table_name_all ON CLUSTER cluster_emr (
        id                    UInt32,
        key1                  String,
        value1                UInt8,
        key2                  Int64,
        value2                Float64
      ) ENGINE = Distributed(cluster_emr, clickhouse_database_name, clickhouse_table_name_local, rand());

步驟二:編譯并打包

  1. 下載并解壓flink-clickhouse-demo.tgz示例到本地。

  2. 在CMD命令行中,進入到下載文件中pom.xml所在的目錄下,執行如下命令打包文件。

    mvn clean package

    根據您pom.xml文件中artifactId的信息,下載文件中的target目錄下會出現flink-clickhouse-demo-1.0.0.jar的JAR包。

步驟三:提交作業

  1. 使用SSH方式登錄Flink集群,詳情請參見登錄集群。

  2. 上傳打包好的flink-clickhouse-demo-1.0.0.jar至Flink集群的根目錄下。

    說明

    本文示例中flink-clickhouse-demo-1.0.0.jar是上傳至root根目錄下,您也可以自定義上傳路徑。

  3. 執行如下命令提交作業。

    代碼示例如下:

    • 流作業

      flink run -m yarn-cluster \
                -c com.aliyun.emr.StreamingJob \
                flink-clickhouse-demo-1.0.0.jar \
                --dbName clickhouse_database_name \
                --tableName clickhouse_table_name_all \
                --ckHost ${clickhouse_host} \
                --password ${password};
    • 批作業

      flink run -m yarn-cluster \
                -c com.aliyun.emr.BatchJob \
                flink-clickhouse-demo-1.0.0.jar \
                --dbName clickhouse_database_name \
                --tableName clickhouse_table_name_all \
                --ckHost ${clickhouse_host} \
                --password ${password};

    參數

    說明

    dbName

    ClickHouse集群數據庫的名稱,默認為default。本文示例為clickhouse_database_name。

    tableName

    ClickHouse集群數據庫中表的名稱。本文示例為clickhouse_table_name_all。

    ckHost

    ClickHouse集群的Master節點的內網IP地址或公網IP地址。ip地址獲取方式,請參見獲取主節點的IP地址。

    password

    ClickHouse用戶的密碼。

    您可以在ClickHouse服務的配置頁面,通過查看users.default.password參數,獲取密碼。

    password

獲取主節點的IP地址

  1. 進入節點管理頁面。

    1. 登錄EMR on ECS。

    2. 在頂部菜單欄處,根據實際情況選擇地域和資源組

    3. EMR on ECS頁面,單擊目標集群操作列的節點管理。

  2. 節點管理頁面,單擊Master節點組所在行的open圖標,復制公網IP列的IP地址。