使用PyJindo訪問阿里云OSS-HDFS
本文將以兩種方式為您介紹如何在Python 3.6及更高版本中,利用Python的工具包PyJindo來操作OSS-HDFS。
背景信息
方式一:直接使用PyJindo包是直接使用PyJindo包的原生接口來操作OSS-HDFS。這種方式更依賴于PyJindo自己的API,且通常需要更深入了解PyJindo包的特定函數(shù)和類。
方式二:使用fsspec接口則是通過fsspec
接口來操作OSS-HDFS,使用PyJindo包中實(shí)現(xiàn)了fsspec
協(xié)議的JindoOssFileSystem
類。這種方式對(duì)于已經(jīng)熟悉fsspec
或希望在不同存儲(chǔ)系統(tǒng)間無縫切換的用戶來說,無疑提供了極大的便利與靈活性。
總結(jié)來說,上述兩種方式都能夠有效地實(shí)現(xiàn)對(duì)OSS-HDFS的互動(dòng)操作,但它們的接口風(fēng)格和集成方式有所不同。您可以根據(jù)自己的需求和偏好選擇使用方式一還是方式二。
前提條件
已創(chuàng)建且登錄集群,詳情請(qǐng)參見創(chuàng)建集群和登錄集群。
方式一:直接使用PyJindo包
直接使用PyJindo中所提供的原生API接口和類實(shí)現(xiàn)對(duì)OSS-HDFS的深度操作。日志級(jí)別和API相關(guān)內(nèi)容,請(qǐng)參見日志等級(jí)和API說明。
步驟一:安裝PyJindo
EMR-5.17.x及之后版本、EMR-3.51.x及之后版本
自EMR-5.17.x及后續(xù)版本,以及EMR-3.51.x及其后續(xù)版本起,在創(chuàng)建的集群中已預(yù)裝了Python 3.8版本的PyJindo庫(kù)。因此,您無需進(jìn)行手動(dòng)安裝,可直接跳過該步驟。
EMR-5.17.x之前版本、EMR-3.51.x之前版本
在下載頁面,下載最新的tar.gz包。
本文示例是下載6.3.x目錄下6.3.2版本的tar.gz包。例如,jindosdk-6.3.2-linux.tar.gz。
解壓縮下載好的tar.gz包,并在以下目錄結(jié)構(gòu)中找到PyJindo安裝包。
本示例以Python 3.8環(huán)境為例。例如,pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl。
. ├── bin │ ├── xxx ├── conf │ ├── xxx ├── include │ ├── xxx ├── lib │ ├── xxx │ ├── native │ │ ├── xxxx │ └── site-packages │ ├── pyjindo-x.y.z-cp310-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp311-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp312-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp36-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp37-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl │ └── pyjindo-x.y.z-cp39-abi3-linux_x86_64.whl ├── plugins │ └── xxxx ├── tools │ ├── xxx └── versions ├── xxx
上傳pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl到目標(biāo)服務(wù)器。
本示例上傳至您EMR集群的home目錄中。登錄集群Master節(jié)點(diǎn)的具體操作,請(qǐng)參見登錄集群。
(可選)確認(rèn)環(huán)境變量。
EMR環(huán)境:默認(rèn)存在以下環(huán)境變量,無需配置。
非EMR環(huán)境:配置方式請(qǐng)參見在非EMR集群中部署JindoSDK。其中,Hadoop配置文件及HADOOP_CONF_DIR不是必須,僅為兼容HADOOP環(huán)境中的配置。
export JINDOSDK_CONF_DIR=/etc/taihao-apps/jindosdk-conf export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
安裝和升級(jí)pip及PyJindo安裝包。
python3.8 -m ensurepip python3.8 -m pip install pip --upgrade --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/ python3.8 -m pip install /home/pyjindo-6.3.2-cp38-abi3-linux_x86_64.whl
步驟二:編寫并執(zhí)行測(cè)試文件
編寫測(cè)試文件fs_test.py。
from pyjindo import fs # 阿里云OSS的Bucket名稱,請(qǐng)根據(jù)實(shí)際情況替換。 bucket = "jindosdk-****" # 阿里云OSS-HDFS的Endpoint,請(qǐng)根據(jù)實(shí)際情況替換。 endpoint = bucket + ".cn-****.oss-dls.aliyuncs.com" root_path = "oss://" + endpoint + "/" sub_dir = root_path + "pyjindotest/" file_path = root_path + "hello.txt" file_path2 = sub_dir + "hello.txt" config = fs.read_config() fs = fs.connect(root_path, "root", config) # 使用fs.open()函數(shù)以二進(jìn)制寫入模式打開指定路徑的文件,如果文件不存在則創(chuàng)建新文件。 out_file = fs.open(file_path, "wb") # 寫入數(shù)據(jù)。 out_file.write(str.encode("hello world, pyjindo")) out_file.close() in_file = fs.open(file_path, "rb") # 讀取文件全部?jī)?nèi)容并保存在變量data中。 data = in_file.read() print("寫入的數(shù)據(jù)為%s." % (data)) in_file.close() # 列出文件。 ls_file = fs.listdir(root_path) print("目錄文件為%s." % (ls_file)) # 創(chuàng)建目錄。 fs.mkdir(sub_dir) # 移動(dòng)并重命名文件。 fs.rename(file_path, file_path2) # 列出文件。 mv_file = fs.listdir(sub_dir) print("移動(dòng)后的目錄文件為%s." % (mv_file)) # 刪除測(cè)試文件,重新列出文件。 fs.remove(file_path2) de_file = fs.listdir(sub_dir) print("刪除文件后的pyjindotest目錄下文件為%s." % (de_file))
執(zhí)行測(cè)試文件。
python3.8 fs_test.py
執(zhí)行結(jié)果如下所示。
寫入的數(shù)據(jù)為b'hello world, pyjindo'. 目錄文件為[<FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/.sysinfo/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/apps/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/flume/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hbase/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hello.txt': type=File, size=20>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyarrowtest/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/spark-history/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/tmp/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/user/': type=Directory>, <FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/yarn/': type=Directory>]. 移動(dòng)后的目錄文件為[<FileInfo for 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/hello.txt': type=File, size=20>]. 刪除文件后的pyjindotest目錄下文件為[].
方式二:使用fsspec接口
通過使用fsspec這一通用文件系統(tǒng)接口標(biāo)準(zhǔn),您可以方便地使用集成的JindoOssFileSystem類來與阿里云OSS-HDFS進(jìn)行交互。日志級(jí)別和API相關(guān)內(nèi)容,請(qǐng)參見日志等級(jí)和API說明。更多接口上說明,請(qǐng)參見fsspec。
步驟一:安裝PyJindo
EMR-5.17.x及之后版本、EMR-3.51.x及之后版本
自EMR-5.17.x及后續(xù)版本,以及EMR-3.51.x及其后續(xù)版本起,在創(chuàng)建的集群中已預(yù)裝了Python 3.8版本的PyJindo庫(kù)。因此,您無需進(jìn)行手動(dòng)安裝,可直接跳過該步驟。
EMR-5.17.x之前版本、EMR-3.51.x之前版本
在下載頁面,下載最新的tar.gz包。
本文示例是下載6.3.x目錄下6.3.2版本的tar.gz包。例如,jindosdk-6.3.2-linux.tar.gz。
解壓縮下載好的tar.gz包,并在以下目錄結(jié)構(gòu)中找到PyJindo安裝包。
本示例以Python 3.8環(huán)境為例。例如,pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl。
. ├── bin │ ├── xxx ├── conf │ ├── xxx ├── include │ ├── xxx ├── lib │ ├── xxx │ ├── native │ │ ├── xxxx │ └── site-packages │ ├── pyjindo-x.y.z-cp310-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp311-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp312-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp36-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp37-abi3-linux_x86_64.whl │ ├── pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl │ └── pyjindo-x.y.z-cp39-abi3-linux_x86_64.whl ├── plugins │ └── xxxx ├── tools │ ├── xxx └── versions ├── xxx
上傳pyjindo-x.y.z-cp38-abi3-linux_x86_64.whl到目標(biāo)服務(wù)器。
本示例上傳至您EMR集群的home目錄中。登錄集群Master節(jié)點(diǎn)的具體操作,請(qǐng)參見登錄集群。
(可選)確認(rèn)環(huán)境變量。
EMR環(huán)境:默認(rèn)存在以下環(huán)境變量,無需配置。
非EMR環(huán)境:配置方式請(qǐng)參見在非EMR集群中部署JindoSDK。其中,Hadoop配置文件及HADOOP_CONF_DIR不是必須,僅為兼容HADOOP環(huán)境中的配置。
export JINDOSDK_CONF_DIR=/etc/taihao-apps/jindosdk-conf export HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf
安裝和升級(jí)pip及PyJindo安裝包。
python3.8 -m ensurepip python3.8 -m pip install pip --upgrade --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/ python3.8 -m pip install /home/pyjindo-6.3.2-cp38-abi3-linux_x86_64.whl
步驟二:安裝依賴fsspec
本文以Python3.8環(huán)境安裝fssepc為例,安裝命令如下所示。
python3.8 -m pip install fsspec --trusted-host mirrors.aliyun.com -i http://mirrors.aliyun.com/pypi/simple/
步驟三:編寫并執(zhí)行測(cè)試文件
編寫測(cè)試文件ossfs_test.py。
from pyjindo.ossfs import JindoOssFileSystem # 阿里云OSS的Bucket名稱,請(qǐng)根據(jù)實(shí)際情況替換。 bucket = "jindosdk-****" # 阿里云OSS-HDFS的Endpoint,請(qǐng)根據(jù)實(shí)際情況替換。 endpoint = bucket + ".cn-****.oss-dls.aliyuncs.com" root_path = "oss://" + endpoint + "/" sub_dir = root_path + "pyjindotest/" file_path = root_path + "hello.txt" file_path2 = sub_dir + "hello.txt" fs = JindoOssFileSystem(root_path) # 使用fs.open()函數(shù)以二進(jìn)制寫入模式打開指定路徑的文件,如果文件不存在則創(chuàng)建新文件。 out_file = fs.open(file_path, "wb") # 寫入數(shù)據(jù)。 out_file.write(str.encode("hello world, pyjindo")) out_file.close() in_file = fs.open(file_path, "rb") # 讀取文件全部?jī)?nèi)容并保存在變量data中。 data = in_file.read() print("寫入的數(shù)據(jù)為%s." % (data)) in_file.close() # 列出文件。 ls_file = fs.ls(root_path, detail=False) print("目錄文件為%s." % (ls_file)) assert file_path in fs.glob(root_path + "*") # 創(chuàng)建目錄。 fs.mkdir(sub_dir) # 移動(dòng)并重命名文件。 fs.rename(file_path, file_path2) # 列出文件。 mv_file = fs.listdir(sub_dir, detail=False) print("移動(dòng)后的目錄文件為%s." % (mv_file)) # 刪除測(cè)試文件,重新列出文件。 fs.rm(file_path2) de_file = fs.ls(sub_dir) print("刪除文件后的pyjindotest目錄下文件為%s." % (de_file))
執(zhí)行測(cè)試文件。
python3.8 ossfs_test.py
執(zhí)行結(jié)果如下所示。
寫入的數(shù)據(jù)為b'hello world, pyjindo'. 目錄文件為['oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/.sysinfo/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/apps/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/flume/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hbase/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/hello.txt', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyarrowtest/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/spark-history/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/test/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/tmp/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/user/', 'oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/yarn/']. 移動(dòng)后的目錄文件為['oss://jindosdk-****-sh.cn-****.oss-dls.aliyuncs.com/pyjindotest/hello.txt']. 刪除文件后的pyjindotest目錄下文件為[].
日志等級(jí)
調(diào)整JINDOSDK_CONF_DIR下的jindosdk.cfg配置,EMR環(huán)境中對(duì)應(yīng)/etc/taihao-apps/jindosdk-conf。
[common]
logger.dir = /var/log/emr/jindosdk
logger.level = 2
logger.verbose = 0
logger.sync = false
logger.jnilogger = true
logger.consolelogger = false
logger.cleaner.enable = true
配置項(xiàng) | 描述 |
logger.dir | 日志文件的存儲(chǔ)目錄。 |
logger.level | 日志等級(jí)。推薦設(shè)置為2,這通常意味著記錄INFO級(jí)別以上的日志。設(shè)置小于等于1,表示設(shè)置日記級(jí)別為WARN及以上。 |
logger.verbose | 詳細(xì)日志的等級(jí),數(shù)字越大,記錄的日志越詳細(xì)。取值范圍為0~99。 |
logger.sync | 是否同步輸出日志。推薦為false,表示不同步輸出日志。 |
logger.jnilogger | 與Java Native Interface (JNI) 相關(guān)的日志設(shè)置,與PyJindo無關(guān)。 |
logger.consolelogger | 是否在終端輸出日志,通常用于調(diào)試,與PyJindo無關(guān)。 |
logger.cleaner.enable | 是否開啟日志自動(dòng)清理功能。這個(gè)功能會(huì)定期清理舊的日志文件,以避免占用過多磁盤空間。推薦為true,表示開啟日志自動(dòng)清理。 |
API說明
Config類
成員函數(shù) | 返回值類型 | 描述 |
set(key, val) | 無 | 設(shè)置字符串類型的配置項(xiàng),其中key和val都是字符串。 |
get(key, default='') | str | 獲取字符串類型的配置項(xiàng)值。 |
contains(key) | bool | 檢查配置中是否存在指定的key。 |
FileType枚舉
枚舉類型 | 枚舉值 | 描述 |
Unknown | 0 | 未知類型或者無法識(shí)別的文件。 |
Directory | 1 | 目錄。 |
File | 2 | 文件。 |
Symlink | 3 | 軟鏈接。 |
FileInfo類
成員屬性 | 返回值類型 | 描述 |
type | FileType | 文件類型。 |
is_file | bool | 表示是否為文件。 |
is_dir | bool | 表示是否為目錄。 |
is_symlink | bool | 表述是否為軟鏈接。 |
path | str | 表示文件的完整路徑。 |
user | str | 表示文件的所有者用戶名。 |
group | str | 表示文件所屬的用戶組名。 |
size | int | 表示文件大小。 |
perm | int | 表示文件的權(quán)限位。 |
atime | datetime | 表示文件最后一次訪問的時(shí)間。 |
mtime | datetime | 表示文件最后一次修改的時(shí)間。 |
FileStream類
成員函數(shù) | 返回值類型 | 描述 |
readable() | bool | 表示該文件流是否可讀。 |
writable() | bool | 表示該文件流是否可寫。 |
seekable() | bool | 表示該文件流是否支持隨機(jī)訪問(即能否通過seek()函數(shù)改變文件讀寫位置)。 |
closed() | bool | 表示該文件流是否已關(guān)閉。 |
close() | 無 | 關(guān)閉當(dāng)前文件流。在關(guān)閉過程中如果發(fā)生錯(cuò)誤,則拋出IOError異常。 |
size() | int | 表示文件大小(僅當(dāng)文件可讀時(shí)可用)。若失敗則拋出IOError異常。 |
tell() | int | 當(dāng)前文件流的位置。若失敗則拋出IOError異常。 |
flush() | 無 | 將緩沖區(qū)中的數(shù)據(jù)強(qiáng)制寫入到文件中。若失敗則拋出IOError異常。 |
write(data) | 無 | 接收一個(gè)bytes類型的數(shù)據(jù),并將其寫入到文件中。若失敗則拋出IOError異常。 |
read(nbytes) | bytes | 從文件中讀取指定大小(nbytes)的數(shù)據(jù),返回一個(gè)bytes對(duì)象。若失敗則拋出IOError異常。 |
pread(nbytes, offset) | bytes | 從文件的特定偏移量(offset)處開始,讀取指定大小(nbytes)的數(shù)據(jù)。若失敗則拋出IOError異常。 |
readall() | bytes | 讀取整個(gè)文件內(nèi)容。若失敗則拋出IOError異常。 |
download(stream_or_path, buffer_size) | 無 | 從當(dāng)前文件流下載內(nèi)容,并將其寫入到本地路徑或目標(biāo)流中。stream_or_path參數(shù)可以是本地路徑也可以是另一個(gè)文件流。若失敗則拋出IOError異常。 |
upload(stream, buffer_size) | 無 | 從給定的stream流中讀取數(shù)據(jù)并寫入到當(dāng)前文件中。若失敗則拋出IOError異常。 |
FileSystem類型
成員函數(shù) | 返回值類型 | 描述 |
mkdir(path, recursive) | bool | 創(chuàng)建指定路徑的目錄,如果recursive參數(shù)為True,則會(huì)遞歸創(chuàng)建所有不存在的父目錄。若失敗則拋出IOError異常。 |
rename(src, dest) | bool | 將src路徑的文件或目錄重命名為dest路徑。若失敗則拋出IOError異常。 |
get_file_info(path) | FileInfo | 獲取指定路徑文件的詳細(xì)信息。若失敗則拋出IOError異常。 |
exists(path) | bool | 檢查指定路徑的文件或目錄是否存在。若失敗則拋出IOError異常。 |
listdir(path, recursive) | FileInfo列表 | 列舉指定路徑下的文件或子目錄信息。若recursive為True,則遞歸列出所有子目錄下的文件信息。若失敗則拋出IOError異常。 |
chmod(path, perm) | bool | 修改指定路徑文件或目錄的權(quán)限,類似setPermission。perm參數(shù)是八進(jìn)制表示的權(quán)限碼,如0o777。若失敗則拋出IOError異常。 |
chown(path, owner, group) | bool | 改變指定路徑文件或目錄的所有者和所屬組,類似setOwner。owner和group分別代表新的用戶名和用戶組名。若失敗則拋出IOError異常。 |
open(path, mode, buffer_size=None) | FileStream | 打開指定路徑的文件,mode支持 |
download(path, stream_or_path, buffer_size=None) | 無 | 從遠(yuǎn)程文件系統(tǒng)下載指定路徑的文件到本地,stream_or_path可以是一個(gè)本地文件路徑或一個(gè)文件流。buffer_size默認(rèn)為64KB,實(shí)際使用時(shí)如果配置中存在fs.oss.read.buffer.size,則優(yōu)先采用該配置值來確定讀取緩沖區(qū)大小。若失敗則拋出IOError異常。 |
upload(path, stream, buffer_size=None) | 無 | 將給定的流stream上傳至遠(yuǎn)程文件系統(tǒng)指定的路徑。buffer_size默認(rèn)為64KB,實(shí)際使用時(shí)如果配置中存在fs.oss.write.buffer.size,則優(yōu)先采用該配置值來確定寫入緩沖區(qū)大小。若失敗則拋出IOError異常。 |
copy_file(src, dest, buffer_size=None) | 無 | 在文件系統(tǒng)內(nèi)部拷貝文件,從src路徑拷貝到dest路徑。buffer_size默認(rèn)為64KB,實(shí)際使用時(shí)如果配置中存在fs.oss.read.buffer.size,則優(yōu)先采用該配置值來確定讀取緩沖區(qū)大小。若失敗則拋出IOError異常。 |
fs模塊
全局函數(shù) | 返回值類型 | 描述 |
read_config() | Config | 用于讀取配置信息。 它會(huì)優(yōu)先檢查環(huán)境變量 |
connect(uri, user, config) | FileSystem | 初始化FileSystem。若失敗則拋出IOError異常。 |