日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Python作業開發實踐

Lindorm計算引擎通過HTTP RESTful API的方式提供Spark Python作業提交入口,您可以按照這種方式運行流批任務、機器學習和圖計算任務。本文介紹Lindorm計算引擎Python作業開發的詳細步驟。

前提條件

已開通Lindorm計算引擎,具體操作請參見開通與變配

Spark Python作業開發流程

  1. 準備Spark Python作業

  2. 打包Spark Python作業

  3. 上傳Spark Python作業

  4. 提交Spark Python作業

步驟一:準備Spark Python作業

  1. 下載Spark Python作業示例壓縮包,下載鏈接為Spark Python作業示例

  2. 解壓Spark Python作業示例壓縮包,解壓后的目錄名稱為lindorm-spark-examples。打開lindorm-spark-examples/python目錄,參考python目錄結構。

  3. 項目開發的根目錄以your_project為例,介紹項目的目錄結構。

    1. your_project目錄下新建__init__.py文件,內容為空。

    2. 改造入口文件。

      1. 在入口文件中編寫代碼,將your_project添加到sys.path中,代碼詳情請參見Spark Python作業示例中的lindorm-spark-examples/python/your_project/main.py文件的Notice1部分。

        # Notice1: You need to do the following step to complete the code modification:
        # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py
        # Step2: Please add current dir to sys.path, you should add the following code to your main file
        current_dir = os.path.abspath(os.path.dirname(__file__))
        sys.path.append(current_dir)
        print("current dir in your_project: %s" % current_dir)
        print("sys.path: %s \n" % str(sys.path))
      2. 在入口文件中將入口邏輯封裝到main(argv)方法中。代碼詳情請參見Spark Python作業示例中的lindorm-spark-examples/python/your_project/main.py文件的Notice2部分。

        # Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function,
        # so that launcher.py in parent directory just call main(sys.argv)
        def main(argv):
            print("Receive arguments: %s \n" % str(argv))
        
            print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__)))
            # Write your code here
        
        
        if __name__ == "__main__":
            main(sys.argv)
    3. 創建Spark Python作業啟動的入口文件,用來調用main(argv)方法。在根目錄your_project創建同級目錄launcher.py,可以復制Spark Python作業示例中的lindorm-spark-examples/python/launcher.py文件。

步驟二:打包Spark Python作業

  1. 打包項目依賴的Python環境和第三方類庫。推薦使用Conda或者Virtualenv將依賴類庫打包為tar包,具體操作請參見Python Package Management

    重要
    • 使用Conda或者Virtualenv打的tar包通過spark.archives傳遞,可以是spark.archives支持的所有格式。詳細說明,請參見spark.archives

    • 請在Linux環境下完成該操作,以保證Lindorm計算引擎能正常識Python二進制文件。

  2. 打包項目文件。將your_project文件打包為.zip或者.egg格式文件。

    • 執行以下命令將項目文件打包為.zip格式文件。

      zip -r project.zip your_project
    • 將項目文件打包為.egg格式文件,具體操作請參見Building Eggs

步驟三:上傳Spark Python作業

將以下文件都上傳至OSS,具體操作請參見上傳文件

  • 步驟二中打包的Python環境和第三方類庫(也就是tar包)。

  • 步驟二中打包的項目文件(也就是.zip或者.egg文件)。

  • 步驟一中的launcher.py文件。

步驟四:提交Spark Python作業

Lindorm計算引擎支持以下兩種方式提交并管理作業。

請求參數包括以下兩個部分:

  • Python作業環境參數說明,示例如下:

    {"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
    • 提交Python作業項目文件(也就是.zip.egg或者.py格式的文件)時,請配置configs參數中的spark.submit.pyFiles

    • 提交Python環境和第三方類庫(也就是tar包)時,請配置configs參數中的spark.archivesspark.kubernetes.driverEnv.PYSPARK_PYTHON

      • 配置spark.archives參數時使用井號(#)指定targetDir

      • 配置spark.kubernetes.driverEnv.PYSPARK_PYTHON參數指定Python文件路徑。

  • 如果將文件上傳至OSS,需要在configs參數中配置以下信息。

    表 1. 配置configs相關參數

    參數

    示例值

    說明

    spark.hadoop.fs.oss.endpoint

    oss-cn-beijing-internal.aliyuncs.com

    存儲Python文件的OSS地址。

    spark.hadoop.fs.oss.accessKeyId

    testAccessKey ID

    通過阿里云控制臺創建的Access Key ID和Access Key Secret,獲取方法請參見創建AccessKey

    spark.hadoop.fs.oss.accessKeySecret

    testAccessKey Secret

    spark.hadoop.fs.oss.impl

    固定值:org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem

    訪問OSS的類。

    說明

    更多參數請參見參數說明

Python作業開發示例

  1. 下載并解壓Spark Python作業示例

  2. 改造入口文件,打開Python目錄下的your_project/main.py文件并配置相關代碼。

    1. your_project目錄添加到sys.path中。

      current_dir = os.path.abspath(os.path.dirname(__file__))
      sys.path.append(current_dir)
      print("current dir in your_project: %s" % current_dir)
      print("sys.path: %s \n" % str(sys.path))
    2. main.py文件中添加入口邏輯,如下示例初始化SparkSession。

      from pyspark.sql import SparkSession
      spark = SparkSession \
          .builder \
          .appName("PythonImportTest") \
          .getOrCreate()
      print(spark.conf)
      spark.stop()
  3. 在Python目錄下打包your_project文件。

    zip -r your_project.zip your_project
  4. 在Linux環境下,使用Conda打包Python運行環境。

    conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz
  5. 將打包好的your_project.zippyspark_conda_env.tar.gz上傳至OSS,并將Python目錄下的launcher.py文件上傳至OSS。

  6. 通過以下兩種方式提交作業。

作業診斷

Python作業提交成功后,可以在作業列表頁面查看作業運行狀況和SparkUI訪問地址,具體操作請參見查看作業。作業提交過程中如果有其他問題,請提交工單并將JobID和WebUI地址提供給工單處理人員。