日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Airflow調度Spark

Airflow是比較流行的開源調度工具,可以實現各類工作負載的DAG編排與調度。您可以通過AnalyticDB for MySQL Spark Airflow Operator、Spark-Submit命令行工具來實現Airflow調度Spark任務。本文介紹如何通過Airflow調度AnalyticDB for MySQL Spark作業。

注意事項

  • AnalyticDB for MySQL Spark支持的配置參數,請參見Spark應用配置參數說明

  • 如果您使用的是Apache Livy的調度方式,AnalyticDB for MySQL Spark Livy Proxy相關工具會在近期發布,可與維護團隊聯系申請邀測使用。

Spark Airflow Operator命令行工具

準備工作

  1. 安裝Airflow服務并啟動。具體操作,請參見Airflow社區文檔

  2. 安裝Airflow Spark插件。執行如下命令:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl

操作步驟

  1. 準備Connection,示例如下。具體操作,請參見創建Connection

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }
  2. 創建DAG聲明Spark工作流,本文的DAG聲明文件為 spark_dags.py

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        start_date=datetime(2021, 1, 1),
        default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
    
        spark_batch = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="oss://<bucket_name>/tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi"
        )
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_batch >> spark_sql
    

    參數說明如下。

    AnalyticDBSparkBatchOperator支持配置的參數。

    參數

    是否必填

    說明

    file

    Spark應用主文件的存儲路徑,文件路徑需為絕對路徑。主文件是入口類所在的JAR包或者Python的入口執行文件。

    重要

    Spark應用主文件目前只支持存儲在OSS中。

    OSS Bucket與AnalyticDB for MySQL集群需要在同一地域。

    class_name

    • Java或Scala程序入口類名稱,必填參數。

    • Python不需要指定入口類,非必填參數。

    args

    Spark應用參數。

    conf

    與開源Spark中的配置項基本一致,參數格式為key: value形式。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明

    jars

    Spark應用依賴的JAR包。需填寫JAR包文件的絕對路徑。JAR包在運行時會被加入到Driver和Executor JVM的ClassPath里面。

    重要

    Spark應用所依賴的所有JAR包必須存儲在OSS中。

    OSS Bucket與AnalyticDB for MySQL集群需要在同一地域。

    py_files

    PySpark依賴的Python文件,后綴可以是ZIP、PY和EGG。如果依賴多個Python文件,建議使用ZIP或者EGG壓縮包。您可以在Python代碼中以module方式引用Python文件。

    重要

    Spark應用所依賴的所有Python文件須存儲在OSS中。

    files

    Spark應用依賴的文件資源,文件會被下載到Driver和Executor進程的當前執行目錄下。

    支持配置文件別名,例如oss://<testBucketName>/test/test1.txt#test1,test1為文件別名,您可以使用./test1或者./test1.txt訪問文件。

    說明

    files中包含名為log4j.properties的文件時,Spark會使用該log4j.properties文件作為日志配置。

    Spark應用所依賴的所有文件須存儲在OSS中。

    driver_resource_spec

    Spark driver的資源規格。默認值為medium。

    不同型號的取值對應不同的規格,詳情請參見Spark資源規格列表的型號列。

    說明

    spark.driver.resourceSpecspark.executor.resourceSpec參數取值相同。

    僅提交Spark離線應用時,可使用開源Spark參數,且取值需為Spark資源規格列表中的核數和內存。

    executor_resource_spec

    Spark executor的資源規格。默認值為medium。

    不同型號的取值對應不同的規格,詳情請參見Spark資源規格列表的型號列。

    num_executors

    Spark Executor個數。默認值為3。

    archives

    Spark應用依賴的壓縮包資源,目前支持.TAR.GZ后綴。壓縮包會被解壓到當前Spark進程的當前目錄下。

    支持配置文件別名,例如oss://testBucketName/test/test1.tar.gz#test1,test1為文件別名。假設test2.txt是test1.tar.gz壓縮包中的文件,您可以使用./test1/test2.txt或者./test1.tar.gz/test2.txt訪問解壓后的文件。

    說明

    Spark應用所依賴的所有壓縮包須存儲在OSS中。壓縮包解壓縮失敗,任務會失敗。

    name

    Spark應用名稱。

    cluster_id

    AnalyticDB for MySQL企業版、基礎版及湖倉版集群ID。

    rg_name

    AnalyticDB for MySQL企業版、基礎版及湖倉版集群的Job型資源組名稱。

    adb_spark_conn_id

    AnalyticDB for MySQL Spark Airflow Connection ID。默認值為adb_spark_default

    region

    AnalyticDB for MySQL企業版、基礎版及湖倉版集群所屬地域ID。

    polling_interval

    掃描Spark應用狀態周期。

    AnalyticDBSparkSQLOperator支持配置的參數。

    參數

    是否必填

    說明

    SQL

    Spark SQL語句。

    conf

    與開源Spark中的配置項基本一致,參數格式為key: value形式。與開源Spark用法不一致的配置參數及AnalyticDB for MySQL特有的配置參數,請參見Spark應用配置參數說明

    driver_resource_spec

    Spark driver的資源規格。默認值為medium。

    不同型號的取值對應不同的規格,詳情請參見Spark資源規格列表的型號列。

    說明

    spark.driver.resourceSpecspark.executor.resourceSpec參數取值相同。

    僅提交Spark離線應用時,可使用開源Spark參數,且取值需為Spark資源規格列表中的核數和內存。

    executor_resource_spec

    Spark executor的資源規格。默認值為medium。

    不同型號的取值對應不同的規格,詳情請參見Spark資源規格列表的型號列。

    num_executors

    Spark Executor個數。默認值為3。

    name

    Spark應用名稱。

    cluster_id

    AnalyticDB for MySQL企業版、基礎版及湖倉版集群ID。

    rg_name

    AnalyticDB for MySQL企業版、基礎版及湖倉版集群的Job型資源組名稱。

    adb_spark_conn_id

    AnalyticDB for MySQL Spark Airflow Connection ID。默認值為adb_spark_default

    region

    AnalyticDB for MySQL企業版、基礎版及湖倉版集群所屬地域ID。

    polling_interval

    掃描Spark應用狀態周期。

  3. spark_dags.py文件存放至Airflow Configuration聲明dags_folder所在的文件夾中。

  4. 執行DAG。具體操作請參見Airflow社區文檔

Spark-Submit命令行工具

說明

對于AnalyticDB for MySQL特有的配置項,例如clusterId、regionId、keyId、secretId、ossUploadPath,您可以在AnalyticDB for MySQL Spark工具包的配置文件conf/spark-defaults.conf中進行配置,也可以通過Airflow參數來配置。詳情請參見Spark應用配置參數

準備工作

準備工作一:安裝Airflow服務

  1. 安裝Airflow服務并啟動。具體操作請參見Airflow社區文檔

  2. 安裝Airflow Spark插件。執行如下命令:

    pip3 install apache-airflow-providers-apache-spark
    重要
    • 您需要使用Python3來安裝Airflow Spark插件。

    • 安裝apache-airflow-providers-apache-spark會默認安裝社區版Pyspark,需要執行如下命令將pyspark卸載。

      pip3 uninstall pyspark

準備工作二:下載并配置Spark-Submit命令行工具

  1. 下載Spark-Submit命令行工具包并進行配置。具體操作請參見通過Spark-Submit命令行工具開發Spark應用

  2. 配置PATH路徑。執行以下命令,將Spark-Submit命令行工具的地址加入Airflow執行地址。

    export PATH=$PATH:</your/adb/spark/path/bin>
    重要

    在啟動Airflow之前需要將Spark-Submit加入到PATH中,否則調度任務可能會找不到Spark-Submit命令。

操作步驟

  1. 準備DAG聲明文件。本文以創建Airflow DAG的demo.py文件為例。

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss://<bucket_name>/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
        submit_job >> sql_job
    
  2. 將編輯完成的demo.py文件放至Airflow安裝目錄的dags目錄下。

  3. 執行DAG。具體操作請參見Airflow社區文檔