PySpark可直接調(diào)用Python的API運(yùn)行Spark作業(yè),PySpark作業(yè)需在特定Python環(huán)境中運(yùn)行。EMR默認(rèn)支持使用Python,若EMR支持的Python版本無法運(yùn)行PySpark作業(yè),則您可參考本實(shí)踐配置可用的Python環(huán)境并在DataWorks上運(yùn)行PySpark作業(yè)。
前提條件
執(zhí)行本實(shí)踐所使用的DataWorks及E-MapReduce(簡(jiǎn)稱EMR)需部署在相同地域。產(chǎn)品各自需執(zhí)行的前提條件如下:操作步驟
- 可選:準(zhǔn)備運(yùn)行Python程序需要的虛擬環(huán)境。
您可選擇直接下載本實(shí)踐的示例包
python3.7使用;或通過如下步驟自主打包Python環(huán)境。
- 制作Docker鏡像。
您可選擇直接下載本實(shí)踐的示例Dockerfile文件至本地或ECS;或在安裝了Docker環(huán)境的宿主機(jī)上新建一個(gè)Dockerfile文件。Dockerfile文件的內(nèi)容如下。
FROM centos:centos7.9.2009
RUN set -ex \
# 預(yù)安裝所需組件。
&& yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\
&& wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \
&& tar -zxvf Python-3.7.0.tgz \
&& cd Python-3.7.0 \
&& ./configure prefix=/usr/local/python3 \
&& make \
&& make install \
&& make clean \
&& rm -rf /Python-3.7.0* \
&& yum install -y epel-release \
&& yum install -y python-pip
# 設(shè)置默認(rèn)為python3。
RUN set -ex \
# 備份舊版本python。
&& mv /usr/bin/python /usr/bin/python27 \
&& mv /usr/bin/pip /usr/bin/pip-python27 \
# 配置默認(rèn)為python3。
&& ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \
&& ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
# 修復(fù)因修改python版本導(dǎo)致yum失效問題。
RUN set -ex \
&& sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \
&& sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \
&& yum install -y deltarpm
# 更新pip版本。
RUN pip install --upgrade pip
- 構(gòu)建鏡像并運(yùn)行容器。
在Dockerfile文件所在路徑下,執(zhí)行如下命令。
sudo docker build -t python-centos:3.7 .
sudo docker run -itd --name python3.7 python-centos:3.7
- 進(jìn)入安裝容器所需的Python依賴庫并打包Python環(huán)境。
sudo docker exec -it python3.7 bash
pip install [所需依賴庫]
# vi requirements.txt
# pip install -r requirements.txt
# numpy
# pandas
cd /usr/local/
zip -r python3.7.zip python3/
- 拷貝容器中的Python環(huán)境到宿主機(jī)。
# 在宿主機(jī)運(yùn)行命令將虛擬環(huán)境拷貝到宿主機(jī)。
sudo docker cp python3.7:/usr/local/python3.7.zip .
- 上傳虛擬環(huán)境。
您可根據(jù)需要,選擇上傳Python虛擬環(huán)境至OSS或HDFS。
說明 本實(shí)踐以上傳至HDFS示例。如果您選擇上傳至OSS,操作詳情請(qǐng)參見
上傳文件。
上傳Python環(huán)境至HDFS命令如下。
# 上傳至HDFS中。
hdfs dfs -copyFromLocal python3.7.zip /tmp/pyspark
- 測(cè)試并上傳Python代碼。
- 您可在本地或ECS中創(chuàng)建一個(gè)
py
文件,按照下述方法測(cè)試Python代碼是否正確。本實(shí)踐示例使用pyspark_test.py
文件測(cè)試。# -*- coding: utf-8 -*-
import os
from pyspark.sql import SparkSession
def noop(x):
import socket
import sys
host = socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ)
print('host: ' + host)
print('PYTHONPATH: ' + os.environ['PYTHONPATH'])
print('PWD: ' + os.environ['PWD'])
print(os.listdir('.'))
return host
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("test_pyspark") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
# 驗(yàn)證系統(tǒng)當(dāng)前環(huán)境變量。
rdd = sc.parallelize(range(10), 2)
hosts = rdd.map(noop).distinct().collect()
print(hosts)
# 驗(yàn)證UDF。
# https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#
# spark.udf.register("udf_squared", udf_squared)
# spark.udf.register("udf_numpy", udf_numpy)
tableName = "store"
df = spark.sql("""select count(*) from %s """ % tableName)
print("rdf count, %s\n" % df.count())
df.show()
- 上傳Python代碼至HDFS中。
參考如下命令,在EMR實(shí)例中上傳Python代碼至HDFS。
說明 本實(shí)踐以上傳至HDFS示例。如果您選擇上傳至OSS,操作詳情請(qǐng)參見
上傳文件。
hdfs dfs -copyFromLocal pyspark_test.py /tmp/pyspark
- 在DataWorks中通過
spark-submit
命令提交作業(yè)。在創(chuàng)建的
EMR Spark節(jié)點(diǎn)中,使用如下命令提交作業(yè)。
說明 如果您選擇上傳Python代碼至OSS,則需替換為實(shí)際使用的OSS路徑。
spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./PYTHONENV/python3/bin/python3.7 \
--conf spark.executorEnv.PYTHONPATH=. \
--conf spark.yarn.appMasterEnv.PYTHONPATH=. \
--conf spark.yarn.appMasterEnv.JOBOWNER=LiuYuQuan \
--archives hdfs://hdfs-cluster/tmp/pyspark/python3.7.zip#PYTHONENV \
## --py-files hdfs://hdfs-cluster/tmp/pyspark/mc_pyspark-0.1.0-py3-none-any.zip \
--driver-memory 4g \
--driver-cores 1 \
--executor-memory 4g \
--executor-cores 1 \
--num-executors 3 \
--name TestPySpark \
hdfs://hdfs-cluster/tmp/pyspark/pyspark_test.py