日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

Airflow調度DLA Spark作業

更新時間:

Airflow是比較流行的開源調度工具,可以實現各類工作負載的DAG編排與調度。您可以通過Spark-Submit和Spark-SQL命令行來實現Airflow調度Spark任務。DLA Spark提供了命令行工具包,支持通過Spark-Submit和Spark-SQL方式來提交Spark作業。您可以直接將開源Spark命令行工具包替換成DLA Spark命令行工具包,并進行簡單的配置即可使用Airflow調度DLA Spark作業。

重要

云原生數據湖分析(DLA)產品已退市,云原生數據倉庫 AnalyticDB MySQL 版湖倉版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相關使用文檔,請參見Airflow調度Spark

準備工作

  • 安裝Airflow服務。

    1. 安裝Airflow服務并啟動。具體操作請參見Airflow社區文檔

    2. 安裝Airflow Spark插件。執行命令如下:

      pip3 install apache-airflow-providers-apache-spark
      說明
      • 您需要使用Python3來安裝Airflow Spark插件。

      • 安裝apache-airflow-providers-apache-spark會默認安裝社區版Pyspark,需要將其卸載,執行命令如下:

        pip3 uninstall pyspark
  • 下載DLA Spark命令行工具包并進行配置。

    1. 下載DLA Spark命令行工具包并進行配置。具體操作請參見Spark-Submit命令行工具

    2. 配置PATH路徑,執行命令如下:

      export PATH=$PATH:/your/dla/spark/path/bin
      說明

      在啟動Airflow scheduler之前需要將Spark-Submit和Spark-SQL命令加入到PATH中,否則調度任務可能會找不到Spark-Submit和Spark-SQL命令。

操作步驟

  1. 編輯DLA Spark Airflow DAG的dla_spark_demo.py文件。如下所示:

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    
    args = {
        'owner': 'Aliyun DLA',
    }
    
    with DAG(
        dag_id='example_dla_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        dla_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium",
            "spark.sql.hive.metastore.version": "dla",
            "spark.dla.connectors": "oss",
            "spark.hadoop.job.oss.fileoutputcommitter.enable": "true"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf = dla_spark_conf,
            application="oss://your-bucket/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
    
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k+"="+v for k,v in dla_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
    
        submit_job >> sql_job
  2. 執行DLA Spark DAG。

    將編輯完成的dla_spark_demo.py文件放到Airflow安裝目錄的dags目錄下,然后執行DLA Spark DAG。具體操作請參見Airflow社區文檔

注意事項

  • DLA Spark的最小資源調度單元是容器,容器規格通過resourceSpec來定義。您可以通過配置spark.driver.resourceSpecspark.executor.resourceSpec來指定driver和executor的容器規格。Hadoop社區通過指定driver和executor的CPU和Memory來申請資源。DLA Spark工具包兼容了Hadoop的資源配置能力,如果您指定了driver和executor的CPU和Memory,會被自動轉換為大于所指定CPU和Memory的最小資源規格。例如,當executor_cores=2、executor_memory=5 G時,則會被轉換為spark.executor.resourceSpec=medium。

  • 對于DLA特有的一些參數,例如vcNameregionIdkeyIdsecretIdossUploadPath,您可以在DLA Spark工具包的配置文件conf/spark-defaults.conf中進行配置,也可以通過Airflow參數來配置。

  • 由于DLA Spark訪問DLA的元數據時,只支持外表,因此對于SparkJDBCOperatorcmd_type='jdbc_to_spark'并且save_mode="overwrite"的方式不支持。

    說明

    DLA Spark訪問自建Hive集群的元數據時,不存在該問題。關于如何訪問自建Hive元數據,請參見Hive

  • 如果您當前使用的是Airflow調度Livy的方式,目前還是需要改造成命令行的形式。DLA Spark團隊正在開發Livy兼容版本,以降低遷移成本,具體請聯系專家服務