日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

除控制臺方式外,您還能通過API提交Spark任務(wù)。阿里云提供了多語言版本的SDK來封裝API。本文基于Python語言介紹如何通過API提交Spark任務(wù)。

前提條件

  • 已創(chuàng)建AccessKey,詳情請參見創(chuàng)建AccessKey。

    說明

    為避免阿里云賬號(主賬號)泄露AccessKey帶來安全風(fēng)險,建議您創(chuàng)建RAM用戶,授予RAM用戶EMR Serverless Spark相關(guān)的訪問權(quán)限,再使用RAM用戶的AccessKey調(diào)用SDK。相關(guān)文檔請參見:

  • 已準備Python3環(huán)境。

  • 請確保代碼運行環(huán)境設(shè)置了環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體配置方法,請參見在Linux、macOS和Windows系統(tǒng)配置環(huán)境變量

安裝EMR Serverless Spark Python SDK

運行以下命令,安裝Python SDK。

pip install alibabacloud_emr_serverless_spark20230808==1.0.0

參考示例

本文的完整代碼示例如下所示,您可以根據(jù)實際情況修改代碼內(nèi)容。

關(guān)于EMR Serverless Spark服務(wù)接入點的更多信息,請參見服務(wù)接入點。

# -*- coding: utf-8 -*-
import os

from typing import List
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_emr_serverless_spark20230808.client import Client
from alibabacloud_emr_serverless_spark20230808.models import (
    StartJobRunRequest,
    Tag,
    JobDriver,
    JobDriverSparkSubmit,
)

from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


# 請確保代碼運行環(huán)境設(shè)置了環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
# 工程代碼泄露可能會導(dǎo)致AccessKey泄露,并威脅賬號下所有資源的安全性。以下代碼示例使用環(huán)境變量獲取AccessKey的方式進行調(diào)用,僅供參考,建議使用更安全的STS方式。
# 將endpoint中的變量替換為EMR Serverless Spark支持的地域ID。
def create_client() -> Client:
    config = open_api_models.Config(
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    )
    config.endpoint = f'emr-serverless-spark.cn-hangzhou.aliyuncs.com'
    return Client(config)


def example_jar():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        ["1"],
        "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="emr-spark-task",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)

def example_sql():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql",
        ["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        "--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    # configuration_overrides = StartJobRunRequestConfigurationOverrides([StartJobRunRequestConfigurationOverridesConfigurations("test", "test", "test")])
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-sql-test",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver,
        # configuration_overrides=configuration_overrides
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)

def example_py():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        ["50"],
        "--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="emr-spark-task",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)


example_jar()
# example_sql()
# example_py()