啟動Spark任務(wù)
除控制臺方式外,您還能通過API提交Spark任務(wù)。阿里云提供了多語言版本的SDK來封裝API。本文基于Python語言介紹如何通過API提交Spark任務(wù)。
前提條件
已創(chuàng)建AccessKey,詳情請參見創(chuàng)建AccessKey。
說明為避免阿里云賬號(主賬號)泄露AccessKey帶來安全風(fēng)險,建議您創(chuàng)建RAM用戶,授予RAM用戶EMR Serverless Spark相關(guān)的訪問權(quán)限,再使用RAM用戶的AccessKey調(diào)用SDK。相關(guān)文檔請參見:
創(chuàng)建RAM用戶以及對應(yīng)AccessKey,請參見創(chuàng)建RAM用戶或創(chuàng)建AccessKey。
為RAM用戶授權(quán),請參見RAM用戶授權(quán)。
已準備Python3環(huán)境。
請確保代碼運行環(huán)境設(shè)置了環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體配置方法,請參見在Linux、macOS和Windows系統(tǒng)配置環(huán)境變量。
安裝EMR Serverless Spark Python SDK
運行以下命令,安裝Python SDK。
pip install alibabacloud_emr_serverless_spark20230808==1.0.0
參考示例
本文的完整代碼示例如下所示,您可以根據(jù)實際情況修改代碼內(nèi)容。
關(guān)于EMR Serverless Spark服務(wù)接入點的更多信息,請參見服務(wù)接入點。
# -*- coding: utf-8 -*-
import os
from typing import List
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_emr_serverless_spark20230808.client import Client
from alibabacloud_emr_serverless_spark20230808.models import (
StartJobRunRequest,
Tag,
JobDriver,
JobDriverSparkSubmit,
)
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
# 請確保代碼運行環(huán)境設(shè)置了環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
# 工程代碼泄露可能會導(dǎo)致AccessKey泄露,并威脅賬號下所有資源的安全性。以下代碼示例使用環(huán)境變量獲取AccessKey的方式進行調(diào)用,僅供參考,建議使用更安全的STS方式。
# 將endpoint中的變量替換為EMR Serverless Spark支持的地域ID。
def create_client() -> Client:
config = open_api_models.Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
config.endpoint = f'emr-serverless-spark.cn-hangzhou.aliyuncs.com'
return Client(config)
def example_jar():
print("Let's run a simple test...")
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
["1"],
"--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="JAR",
name="emr-spark-task",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def example_sql():
print("Let's run a simple test...")
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/sql/show_db.sql",
["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
"--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
# configuration_overrides = StartJobRunRequestConfigurationOverrides([StartJobRunRequestConfigurationOverridesConfigurations("test", "test", "test")])
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-sql-test",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver,
# configuration_overrides=configuration_overrides
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
def example_py():
print("Let's run a simple test...")
client = create_client()
tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
job_driver_spark_submit = JobDriverSparkSubmit(
"oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
["50"],
"--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
)
job_driver = JobDriver(job_driver_spark_submit)
start_job_run_request = StartJobRunRequest(
region_id="cn-hangzhou",
resource_queue_id="root_queue",
code_type="PYTHON",
name="emr-spark-task",
release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
tags=tags,
job_driver=job_driver
)
runtime = util_models.RuntimeOptions()
headers = {}
try:
response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
runtime)
print(response.body.to_map())
except Exception as error:
print(error.message)
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
example_jar()
# example_sql()
# example_py()
文檔內(nèi)容是否對您有幫助?