Airflow是比較流行的開源調度工具,可以實現各類工作負載的DAG編排與調度。您可以通過Spark-Submit和Spark-SQL命令行來實現Airflow調度Spark任務。DLA Spark提供了命令行工具包,支持通過Spark-Submit和Spark-SQL方式來提交Spark作業。您可以直接將開源Spark命令行工具包替換成DLA Spark命令行工具包,并進行簡單的配置即可使用Airflow調度DLA Spark作業。
云原生數據湖分析(DLA)產品已退市,云原生數據倉庫 AnalyticDB MySQL 版湖倉版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相關使用文檔,請參見Airflow調度Spark。
準備工作
安裝Airflow服務。
安裝Airflow服務并啟動。具體操作請參見Airflow社區文檔。
安裝Airflow Spark插件。執行命令如下:
pip3 install apache-airflow-providers-apache-spark
說明您需要使用Python3來安裝Airflow Spark插件。
安裝apache-airflow-providers-apache-spark會默認安裝社區版Pyspark,需要將其卸載,執行命令如下:
pip3 uninstall pyspark
下載DLA Spark命令行工具包并進行配置。
下載DLA Spark命令行工具包并進行配置。具體操作請參見Spark-Submit命令行工具。
配置PATH路徑,執行命令如下:
export PATH=$PATH:/your/dla/spark/path/bin
說明在啟動Airflow scheduler之前需要將Spark-Submit和Spark-SQL命令加入到PATH中,否則調度任務可能會找不到Spark-Submit和Spark-SQL命令。
操作步驟
編輯DLA Spark Airflow DAG的dla_spark_demo.py文件。如下所示:
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun DLA', } with DAG( dag_id='example_dla_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: dla_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium", "spark.sql.hive.metastore.version": "dla", "spark.dla.connectors": "oss", "spark.hadoop.job.oss.fileoutputcommitter.enable": "true" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf = dla_spark_conf, application="oss://your-bucket/jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k+"="+v for k,v in dla_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_job
執行DLA Spark DAG。
將編輯完成的dla_spark_demo.py文件放到Airflow安裝目錄的dags目錄下,然后執行DLA Spark DAG。具體操作請參見Airflow社區文檔。
注意事項
DLA Spark的最小資源調度單元是容器,容器規格通過
resourceSpec
來定義。您可以通過配置spark.driver.resourceSpec
和spark.executor.resourceSpec
來指定driver和executor的容器規格。Hadoop社區通過指定driver和executor的CPU和Memory來申請資源。DLA Spark工具包兼容了Hadoop的資源配置能力,如果您指定了driver和executor的CPU和Memory,會被自動轉換為大于所指定CPU和Memory的最小資源規格。例如,當executor_cores
=2、executor_memory
=5 G時,則會被轉換為spark.executor.resourceSpec
=medium。對于DLA特有的一些參數,例如
vcName
、regionId
、keyId
、secretId
、ossUploadPath
,您可以在DLA Spark工具包的配置文件conf/spark-defaults.conf中進行配置,也可以通過Airflow參數來配置。由于DLA Spark訪問DLA的元數據時,只支持外表,因此對于
SparkJDBCOperator
,cmd_type='jdbc_to_spark'
并且save_mode="overwrite"
的方式不支持。說明DLA Spark訪問自建Hive集群的元數據時,不存在該問題。關于如何訪問自建Hive元數據,請參見Hive。
如果您當前使用的是Airflow調度Livy的方式,目前還是需要改造成命令行的形式。DLA Spark團隊正在開發Livy兼容版本,以降低遷移成本,具體請聯系專家服務。