通過Apache Airflow使用Livy Operator提交任務(wù)
Apache Airflow是一個(gè)強(qiáng)大的工作流程自動(dòng)化和調(diào)度工具,它允許開發(fā)者編排、計(jì)劃和監(jiān)控?cái)?shù)據(jù)管道的執(zhí)行。EMR Serverless Spark為處理大規(guī)模數(shù)據(jù)處理任務(wù)提供了一個(gè)無服務(wù)器計(jì)算環(huán)境。本文為您介紹如何通過Apache Airflow的Livy Operator實(shí)現(xiàn)自動(dòng)化地向EMR Serverless Spark提交任務(wù),以實(shí)現(xiàn)任務(wù)調(diào)度和執(zhí)行的自動(dòng)化,幫助您更有效地管理數(shù)據(jù)處理任務(wù)。
背景信息
Apache Livy通過REST接口與Spark進(jìn)行交互,極大簡(jiǎn)化了Spark和應(yīng)用程序服務(wù)器之間的通信復(fù)雜度。關(guān)于Livy API,請(qǐng)參見REST API。
前提條件
已安裝并啟動(dòng)Airflow服務(wù),詳情請(qǐng)參見Installation of Airflow。
已創(chuàng)建工作空間,詳情請(qǐng)參見創(chuàng)建工作空間。
操作步驟
步驟一:創(chuàng)建Gateway及訪問Token
創(chuàng)建Gateway。
進(jìn)入Gateway頁面。
在左側(cè)導(dǎo)航欄,選擇
。在Spark頁面,單擊目標(biāo)工作空間名稱。
在EMR Serverless Spark頁面,單擊左側(cè)導(dǎo)航欄中的
。
在Livy Gateway頁面,單擊創(chuàng)建Livy Gateway。
在創(chuàng)建Gateway頁面,輸入名稱(例如,Livy-gateway),單擊創(chuàng)建。
其余參數(shù)請(qǐng)根據(jù)具體情況進(jìn)行調(diào)整,更多參數(shù)信息請(qǐng)參見管理Gateway。
創(chuàng)建Token。
在Gateway頁面,單擊Livy-gateway操作列的Token管理。
單擊創(chuàng)建Token。
在創(chuàng)建Token對(duì)話框中,輸入名稱(例如,Livy-token),單擊確定。
復(fù)制Token信息。
重要Token創(chuàng)建完成后,請(qǐng)務(wù)必立即復(fù)制新Token的信息,后續(xù)不支持查看。如果您的Token過期或遺失,請(qǐng)選擇新建Token或重置Token。
步驟二:配置Apache Airflow
執(zhí)行以下命令,在Apache Airflow環(huán)境中安裝Apache Livy。
pip install apache-airflow-providers-apache-livy
添加Connection。
UI方式
在Airflow中找到默認(rèn)為livy_default的Connection,并對(duì)其信息進(jìn)行修改;或者您也可以在Airflow Web頁面手動(dòng)添加Connection,詳情請(qǐng)參見創(chuàng)建Connection。
涉及以下信息:
Host:填寫為Gateway中的Endpoint信息。
Schema:填寫為https。
Extra:填寫JSON字符串,
x-acs-spark-livy-token
為您前一個(gè)步驟中復(fù)制的Token信息。{ "x-acs-spark-livy-token": "6ac**********kfu" }
CLI方式
通過Airflow CLI執(zhí)行相應(yīng)命令來建立Connection,詳情請(qǐng)參見創(chuàng)建Connection。
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # Gateway中的Endpoint信息。 "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # 為您前一個(gè)步驟中復(fù)制的Token信息。 } }'
步驟三: 使用Livy Operator提交Spark任務(wù)
Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務(wù)執(zhí)行的方式,以下是通過Airflow使用Livy Operator執(zhí)行Spark任務(wù)的示例。
從阿里云OSS獲取并執(zhí)行Python腳本文件。
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'aliyun',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id="livy_operator_sparkpi_dag",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2024, 5, 20),
tags=['example', 'spark', 'livy'],
catchup=False
)
# define livy task with LivyOperator
# 請(qǐng)根據(jù)實(shí)際情況替換file內(nèi)容。
livy_sparkpi_submit_task = LivyOperator(
file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
class_name="org.apache.spark.examples.SparkPi",
args=['1000'],
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task
file
為您的Spark任務(wù)對(duì)應(yīng)的文件路徑,本文示例為上傳至阿里云OSS上的JAR包spark-examples_2.12-3.3.1.jar的路徑,請(qǐng)您根據(jù)實(shí)際情況替換。上傳操作可參見簡(jiǎn)單上傳。
步驟四:查看提交至EMR的任務(wù)
在EMR Serverless Spark頁面,單擊左側(cè)導(dǎo)航欄中的任務(wù)歷史。
在任務(wù)歷史的開發(fā)任務(wù)頁簽,您可以查看提交的任務(wù)。
相關(guān)文檔
在Apache Airflow中,您也可以選擇使用EMR提供的EmrServerlessSparkStartJobRunOperator
接口來提交EMR Serverless Spark任務(wù),提供了一種除了Livy之外的便捷途徑。更多詳情,請(qǐng)參見通過Apache Airflow向EMR Serverless Spark提交任務(wù)。