日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

JAR作業開發

更新時間:

Flink DataStream提供了更靈活的編程模型和API,可以自定義各種數據轉換、操作和算子,適用于復雜的業務邏輯和數據處理需求。本文為您介紹Flink JAR作業的開發方法。

支持開源Apache Flink

目前實時計算Flink支持的DataStream API完全兼容開源的Flink版本,詳情請參見Apache Flink介紹Flink DataStream API開發指南

開發環境要求

  • 已安裝IntelliJ IDEA等開發工具。

  • 已安裝3.6.3及以上版本的Maven。

  • 作業開發需要使用JDK 1.8版本。

  • JAR作業需要您在線下完成開發,再在Flink全托管控制臺上部署并運行。

開發準備

本樣例涉及關于數據源連接器如何使用,請準備好相關數據源。

說明

作業開發

Maven環境配置(可選)

配置Maven的setting.xml文件。如果您在后續的操作中,對Maven中央倉庫的訪問存在無法拉取或速率較慢的情況,可以更換為阿里云鏡像倉庫。

  <mirror>
      <id>aliyunmaven</id>
        <mirrorOf>central</mirrorOf>
        <name>Aliyun Maven</name>
        <url>https://maven.aliyun.com/repository/public</url>
    </mirror>

配置Flink環境依賴

說明

為了避免JAR包依賴沖突,請您注意以下幾點:

  • ${flink.version}為作業運行對應的Flink版本。請使用與作業部署頁面選擇的VVR引擎所使用的Flink版本一致。例如您在部署頁面選擇的引擎為1.17-vvr-8.0.4-1,其對應的Flink版本為1.17.0,查看VVR引擎版本詳情請參見如何查看當前作業的Flink版本?

  • Flink相關依賴,作用域請使用provided,即在依賴中添加<scope>provided</scope>。主要包含org.apache.flink組下以flink-開頭的非Connector依賴。

  • Flink源代碼中只有明確標注了@Public或者@PublicEvolving的才是公開供用戶調用的方法,阿里云實時計算Flink版只對這些方法的兼容性做出產品保證。

  • 如果是Flink服務內置的Connector支持的DataStream API,建議使用其內置的依賴。

下面是Flink的一些基本相關依賴,您可能還需要補充一些日志文件相關的依賴,完整的依賴參考請參見文末的完整示例代碼

flink相關依賴
         <!-- Apache Flink 依賴項 -->
        <!-- 之所以提供這些依賴項,是因為它們不應該打包到JAR文件中。 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

連接器依賴和使用

通過DataStream的方式讀寫數據,需要使用對應的DataStream連接器連接Flink全托管。Maven中央倉庫已經放置了VVR DataStream連接器,以供您在作業開發時直接使用。

重要

請使用我們在支持的連接器中指明提供DataStream API的連接器。如果某個連接器未注明提供給DataStream API,請勿自行使用,因為未來接口和參數可能會被修改。

您可以選擇以下任意一種方式來使用連接器:

(推薦)上傳連接器Uber JAR包到Flink開發控制臺,部署作業時作為附加依賴文件引入

  1. 在作業的Maven POM文件中添加您需要的連接器作為項目依賴,其作用域為provided。完整的依賴文件請參考文末的完整示例代碼

    說明
    • ${vvr.version}是作業運行環境引擎版本,如您的作業運行在1.17-vvr-8.0.4-1版本引擎上,其對應的Flink版本為1.17.0。建議您使用最新的引擎,具體版本詳見引擎

    • 由于將連接器的Uber JAR包作為附加依賴文件引入,則無需將該依賴打入JAR包中,所以需要聲明作用域為provided

            <!-- Kafka 連接器依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
            <!-- MySQL 連接器依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
                <scope>provided</scope>
            </dependency>
  2. 如果您有開發新連接器或者拓展現有連接器功能的需求,項目還需要依賴連接器公共包flink-connector-baseververica-connector-common

            <!-- Flink 連接器公共接口基礎依賴 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 阿里云連接器公共接口基礎依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. DataStream連接配置信息和代碼示例需要查看對應的DataStream連接器文檔。

    支持作為DataStream類型的連接器列表,請參見支持的連接器

  4. 部署作業并在附加依賴文件項中添加相應的連接器Uber JAR包,詳情請參見部署JAR作業。您可以上傳您自己開發的連接器,也可以上傳實時計算Flink版提供的連接器(下載地址請參見Connector列表)。如圖所示。

    image

直接將連接器作為項目依賴打進作業JAR包

  1. 在作業的Maven POM文件中添加您需要的連接器作為項目依賴。例如引入Kafka連接器和MySQL連接器。

    說明
    • ${vvr.version}是作業運行環境引擎版本,如您的作業運行在1.17-vvr-8.0.4-1版本引擎上,其對應的Flink版本為1.17.0。建議您使用最新的引擎,具體版本詳見引擎

    • 由于將連接器作為項目依賴直接打入JAR包,它們必須在默認作用域(compile)中。

            <!-- Kafka 連接器依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
            </dependency>
            <!-- MySQL 連接器依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  2. 如果您有開發新連接器或者拓展現有連接器功能的需求,項目還需要依賴連接器公共包flink-connector-baseververica-connector-common

            <!-- Flink 連接器公共接口基礎依賴 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 阿里云連接器公共接口基礎依賴 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  3. DataStream連接配置信息和代碼示例需要查看對應的DataStream連接器文檔。

    支持作為DataStream類型的連接器列表,請參見支持的連接器

OSS附加依賴文件讀取

因為Flink JAR作業不支持在Main函數中讀取本地配置,您可以將配置文件上傳到Flink工作空間下的OSS Bucket,在部署JAR作業時,通過添加附加配置文件的方式進行讀取。示例如下。

  1. 創建配置文件config.properties,避免在代碼中出現明文代碼。

    # Kafka 
    bootstrapServers=host1:9092,host2:9092,host3:9092
    inputTopic=topic
    groupId=groupId
    # MySQL
    database.url=jdbc:mysql://localhost:3306/my_database
    database.username=username
    database.password=password
  2. 在JAR作業中使用代碼讀取存儲在OSS Bucket上的配置文件config.properties。

    方式一:讀取工作空間綁定的OSS Bucket

    1. 實時計算開發控制臺左側導航欄資源管理頁面,上傳該文件。

    2. 在作業運行時,部署作業所添加附加依賴文件將會加載到作業所運行Pod的/flink/usrlib目錄下。

    3. 讀取該配置文件代碼示例如下。

                  Properties properties = new Properties();
                  Map<String,String> configMap = new HashMap<>();
      
                  try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
                      // 加載屬性文件
                      properties.load(input);
                      // 獲取屬性值
                      configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
                      configMap.put("inputTopic",properties.getProperty("inputTopic"));
                      configMap.put("groupId",properties.getProperty("groupId"));
                      configMap.put("url",properties.getProperty("database.url")) ;
                      configMap.put("username",properties.getProperty("database.username"));
                      configMap.put("password",properties.getProperty("database.password"));
                  } catch (IOException ex) {
                      ex.printStackTrace();
                  }

    方式二:讀取工作空間有權限訪問的OSS Bucket

    1. 將配置文件上傳目標OSS Bucket。

    2. 通過OSSClient直接讀取OSS上的存儲文件詳情,請參見流式傳輸管理訪問憑據。代碼示例如下。

      OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret");
      try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties");
           BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) {
          // read file and process ...
      } finally {
          if (ossClient != null) {
              ossClient.shutdown();
          }
      }

業務代碼編寫

  1. 將外部數據源集成到Flink數據流程序。Watermark是Flink一種基于時間語義的計算策略,往往伴隨著時間戳一起使用,所以本示例不使用水印策略。詳情請參考水印策略

             // 將外部數據源集成到flink數據流程序
            // WatermarkStrategy.noWatermarks() 指沒有使用水印策略
            DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
  2. 算子轉換處理。示例中將DataStream<Srting>轉換成DataStream<Student>,更多復雜的算子轉化和處理方式請參考Flink算子

              // 轉換數據結構為student的算子
              DataStream<student> source = stream
                    .map(new MapFunction<String, student>() {
                        @Override
                        public student map(String s) throws Exception {
                            // 數據由逗號分隔
                            String[] data = s.split(",");
                            return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                        }
                    }).filter(student -> student.score >=60); // 篩選出分數大于60分的數據

作業打包

通過maven-shade-plugin插件打包。

重要
  • 如果選擇作為附加依賴文件引入使用連接器,打包作業時,確認連接器相關依賴的作用域為provided

  • 如果選擇連接器作為依賴一起打包,作用域默認(compile)即可。

maven-shade-plugin插件依賴參考

<build>
        <plugins>
            <!-- Java 編譯器-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- 我們使用maven-shade 創建一個包含所有必須依賴的 fat jar -->
            <!-- 修改<mainClass>的值.如果您的程序入口點發生了改變 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- 去掉一些不必要的依賴性 -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- 不要復制META-INF文件夾中的簽名。否則,這可能會在使用JAR文件時導致安全異常 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

作業測試及部署

  • 由于實時計算Flink版默認不具備訪問公網的能力,可能您的代碼無法在本地進行直接測試。建議您分開進行單元測試,詳情請參見本地運行和調試包含連接器的作業

  • JAR作業部署請參見部署JAR作業

    說明
    • 部署時,如果選擇方式一使用連接器打包的作業,切記需要上傳添加連接器相關的Uber JAR包。

    • 如果需要讀取配置文件,也需要在附加依賴文件中上傳添加。

    image

完整示例代碼

本示例代碼中,將Kafka數據源的數據進行處理后寫入MySQL。此示例僅供參考,更多的代碼風格和質量指南請參見代碼風格和質量指南

FlinkDemo.java

package com.aliyun;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkDemo {
    // 定義數據結構
    public static class Student {
        public int id;
        public String name;
        public int score;

        public Student(int id, String name, int score) {
            this.id = id;
            this.name = name;
            this.score = score;
        }
    }

    public static void main(String[] args) throws Exception {
        // 創建Flink執行環境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        Map<String,String> configMap = new HashMap<>();

        try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
            // 加載屬性文件
            properties.load(input);
            // 獲取屬性值
            configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
            configMap.put("inputTopic",properties.getProperty("inputTopic"));
            configMap.put("groupId",properties.getProperty("groupId"));
            configMap.put("url",properties.getProperty("database.url")) ;
            configMap.put("username",properties.getProperty("database.username"));
            configMap.put("password",properties.getProperty("database.password"));
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        // Build Kafka source
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                        .setBootstrapServers(configMap.get("bootstrapServers"))
                        .setTopics(configMap.get("inputTopic"))
                        .setStartingOffsets(OffsetsInitializer.latest())
                        .setGroupId(configMap.get("groupId"))
                        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                        .build();

        // 將外部數據源集成到flink數據流程序
        // WatermarkStrategy.noWatermarks() 指沒有使用水印策略
        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");

        // 篩選出分數大于60分的數據
        DataStream<Student> source = stream
                .map(new MapFunction<String, Student>() {
                    @Override
                    public Student map(String s) throws Exception {
                        String[] data = s.split(",");
                        return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
                    }
                }).filter(Student -> Student.score >=60);

        source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
                new JdbcStatementBuilder<Student>() {
                    public void accept(PreparedStatement ps, Student data) {
                        try {
                            ps.setInt(1, data.id);
                            ps.setString(2, data.name);
                            ps.setInt(3, data.score);
                        } catch (SQLException e) {
                            throw new RuntimeException(e);
                        }
                    }
                },
                new JdbcExecutionOptions.Builder()
                        .withBatchSize(5) // 每次批量寫入的記錄數
                        .withBatchIntervalMs(2000) // 重試時的最大延遲時間(毫秒)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl(configMap.get("url"))
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername(configMap.get("username"))
                        .withPassword(configMap.get("password"))
                        .build()
        )).name("Sink MySQL");

        env.execute("Flink Demo");
    }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.aliyun</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>FlinkDemo</name>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.14.1</log4j.version>
    </properties>
    <dependencies>
        <!-- Apache Flink 依賴項 -->
        <!-- 之所以提供這些依賴項,是因為它們不應該打包到JAR文件中。 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 在這里添加連接器依賴項。它們必須在默認作用域(compile)中。 -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        <!-- 添加日志框架,以便在IDE中運行時生成控制臺輸出 -->
        <!-- 默認情況下,這些依賴項從應用程序JAR中排除 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java 編譯器-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.11.0</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- 我們使用maven-shade 創建一個包含所有必須依賴的 fat jar -->
            <!-- 修改<mainClass>的值.如果您的程序入口點發生了改變 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <!-- 去掉一些不必要的依賴性 -->
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- 不要復制META-INF文件夾中的簽名。否則,這可能會在使用JAR文件時導致安全異常 -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.aliyun.FlinkDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

相關文檔