本文介紹JindoFS的數據遷移工具Jindo DistCp的使用方法。

前提條件

  • 本地安裝了Java JDK 8。
  • 已創建集群,詳情請參見創建集群

使用Jindo Distcp

  1. 通過SSH方式連接集群。

    詳情請參見登錄集群

  2. 執行以下命令,獲取幫助信息。
    jindo distcp --help
    返回信息如下。
         --help           - Print help text
         --src=VALUE          - Directory to copy files from
         --dest=VALUE              - Directory to copy files to
         --parallelism=VALUE         - Copy task parallelism
         --outputManifest=VALUE       - The name of the manifest file
         --previousManifest=VALUE   -   The path to an existing manifest file
         --requirePreviousManifest=VALUE   -   Require that a previous manifest is present if specified
         --copyFromManifest   -   Copy from a manifest instead of listing a directory
         --srcPrefixesFile=VALUE   -   File containing a list of source URI prefixes
         --srcPattern=VALUE   -   Include only source files matching this pattern
         --deleteOnSuccess   -   Delete input files after a successful copy
         --outputCodec=VALUE   -   Compression codec for output files
         --groupBy=VALUE   -   Pattern to group input files by
         --targetSize=VALUE   -   Target size for output files
         --enableBalancePlan   -   Enable plan copy task to make balance
         --enableDynamicPlan   -   Enable plan copy task dynamically
         --enableTransaction   -   Enable transation on Job explicitly
         --diff   -   show the difference between src and dest filelist
         --ossKey=VALUE   -   Specify your oss key if needed
         --ossSecret=VALUE   -   Specify your oss secret if needed
         --ossEndPoint=VALUE   -   Specify your oss endPoint if needed
         --policy=VALUE   -   Specify your oss storage policy
         --cleanUpPending   -   clean up the incomplete upload when distcp job finish
         --queue=VALUE   -   Specify yarn queuename if needed
         --bandwidth=VALUE   -   Specify bandwidth per map/reduce in MB if needed
         --s3Key=VALUE   -   Specify your s3 key
         --s3Secret=VALUE   -   Specify your s3 Sercet
         --s3EndPoint=VALUE   -   Specify your s3 EndPoint
         --enableCMS  -   Enable CMS
         --update   -   Update target, copying only missing files or directories
         --filters=VALUE   -   Specify a path of file containing patterns to exlude source files

--src和--dest

--src表示指定源文件的路徑,--dest表示目標文件的路徑。

Jindo DistCp默認將--src目錄下的所有文件拷貝到指定的--dest路徑下。您可以通過指定--dest路徑來確定拷貝后的文件目錄,如果不指定根目錄,Jindo DistCp會自動創建根目錄。

例如,您可以執行以下命令,將/opt/tmp下的文件拷貝到OSS Bucket。
jindo distcp --src /opt/tmp --dest oss://<yourBucketName>/tmp
說明 本文示例中的yourBucketName是您OSS Bucket的名稱。

--parallelism

--parallelism用于指定MapReduce作業里的mapreduce.job.reduces參數。該參數默認為7,您可以根據集群的資源情況,通過自定義--parallelism大小來控制DistCp任務的并發度。

例如,將HDFS上/opt/tmp目錄拷貝到OSS Bucket,可以執行以下命令。
jindo distcp --src /opt/tmp --dest oss://<yourBucketName>/tmp --parallelism 20

--srcPattern

--srcPattern使用正則表達式,用于選擇或者過濾需要復制的文件。您可以編寫自定義的正則表達式來完成選擇或者過濾操作,正則表達式必須為全路徑正則匹配。

例如,如果您需要復制/data/incoming/hourly_table/2017-02-01/03下所有log文件,您可以通過指定--srcPattern的正則表達式來過濾需要復制的文件。

執行以下命令,查看/data/incoming/hourly_table/2017-02-01/03下的文件。
hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
返回信息如下。
Found 6 items
-rw-r-----   2 root hadoop       2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r-----   2 root hadoop       4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r-----   2 root hadoop       4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r-----   2 root hadoop       4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r-----   2 root hadoop       1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r-----   2 root hadoop       1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
執行以下命令,復制以log結尾的文件。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --srcPattern .*\.log --parallelism 20
執行以下命令,查看目標bucket的內容。
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03
返回信息如下,顯示只復制了以log結尾的文件。
Found 2 items
-rw-rw-rw-   1       4891 2020-04-17 20:52 oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log
-rw-rw-rw-   1       4891 2020-04-17 20:52 oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log

--deleteOnSuccess

--deleteOnSuccess可以移動數據并從源位置刪除文件。

例如,執行以下命令,您可以將/data/incoming/下的hourly_table文件移動到OSS Bucket中,并刪除源位置文件。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --deleteOnSuccess --parallelism 20

--outputCodec

--outputCodec可以在線高效地存儲數據和壓縮文件。

jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputCodec=gz --parallelism 20
目標文件夾中的文件已經使用gz編解碼器壓縮了。
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03
返回信息如下:
Found 6 items
-rw-rw-rw-   1        938 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/000151.sst.gz
-rw-rw-rw-   1       1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log.gz
-rw-rw-rw-   1       1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log.gz
-rw-rw-rw-   1       1956 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/OPTIONS-000109.gz
-rw-rw-rw-   1        506 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp01.txt.gz
-rw-rw-rw-   1        506 2020-04-17 20:58 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp06.txt.gz
Jindo DistCp當前版本支持編解碼器gzip、gz、lzo、lzop、snappy以及關鍵字none和keep(默認值)。關鍵字含義如下:
  • none表示保存為未壓縮的文件。如果文件已壓縮,則Jindo DistCp會將其解壓縮。
  • keep表示不更改文件壓縮形態,按原樣復制。
說明 如果您想在開源Hadoop集群環境中使用編解碼器lzo,則需要安裝gplcompression的native庫和hadoop-lzo包。

--outputManifest和--requirePreviousManifest

--outputManifest可以指定生成DistCp的清單文件,用來記錄copy過程中的目標文件、源文件和數據量大小等信息。

如果您需要生成清單文件,則指定--requirePreviousManifestfalse。當前outputManifest文件默認且必須為gz類型壓縮文件。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputManifest=manifest-2020-04-17.gz --requirePreviousManifest=false --parallelism 20
查看outputManifest文件內容。
hadoop fs -text oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz > before.lst
cat before.lst 
返回信息如下。
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/000151.sst","baseName":"2017-02-01/03/000151.sst","srcDir":"oss://<yourBucketName>/hourly_table","size":2252}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/1.log","baseName":"2017-02-01/03/1.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/2.log","baseName":"2017-02-01/03/2.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/OPTIONS-000109","baseName":"2017-02-01/03/OPTIONS-000109","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/emp01.txt","baseName":"2017-02-01/03/emp01.txt","srcDir":"oss://<yourBucketName>/hourly_table","size":1016}
{"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/emp06.txt","baseName":"2017-02-01/03/emp06.txt","srcDir":"oss://<yourBucketName>/hourly_table","size":1016}

--outputManifest和--previousManifest

--outputManifest表示包含所有已復制文件(舊文件和新文件)的列表,--previousManifest表示只包含之前復制文件的列表。您可以使用--outputManifest--previousManifest重新創建完整的操作歷史記錄,查看運行期間復制的文件。

例如,在源文件夾中新增加了兩個文件,命令如下所示。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --outputManifest=manifest-2020-04-18.gz --previousManifest=oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz --parallelism 20
執行以下命令,查看運行期間復制的文件。
hadoop fs -text oss://<yourBucketName>/hourly_table/manifest-2020-04-18.gz > current.lst
diff before.lst current.lst 
返回信息如下。
3a4,5
> {"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/5.log","baseName":"2017-02-01/03/5.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}
> {"path":"oss://<yourBucketName>/hourly_table/2017-02-01/03/6.log","baseName":"2017-02-01/03/6.log","srcDir":"oss://<yourBucketName>/hourly_table","size":4891}

--copyFromManifest

使用--outputManifest生成清單文件后,您可以使用--copyFromManifest指定--outputManifest生成的清單文件,并將dest目錄生成的清單文件中包含的文件信息拷貝到新的目錄下。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --previousManifest=oss://<yourBucketName>/hourly_table/manifest-2020-04-17.gz --copyFromManifest --parallelism 20

--srcPrefixesFile

--srcPrefixesFile可以一次性完成多個文件夾的復制。

示例如下,查看hourly_table下文件。
hdfs dfs -ls oss://<yourBucketName>/hourly_table
返回信息如下。
Found 4 items
drwxrwxrwx   -          0 1970-01-01 08:00 oss://<yourBucketName>/hourly_table/2017-02-01
drwxrwxrwx   -          0 1970-01-01 08:00 oss://<yourBucketName>/hourly_table/2017-02-02
執行以下命令,復制hourly_table下文件到folders.txt
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --srcPrefixesFile file:///opt/folders.txt --parallelism 20
查看folders.txt文件的內容。
cat folders.txt 
返回信息如下。
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-01
hdfs://emr-header-1.cluster-50466:9000/data/incoming/hourly_table/2017-02-02

--groupBy和-targetSize

因為Hadoop可以從HDFS中讀取少量的大文件,而不再讀取大量的小文件,所以在大量小文件的場景下,您可以使用Jindo DistCp將小文件聚合為指定大小的大文件,以便于優化分析性能和降低成本。

例如,執行以下命令,查看如下文件夾中的數據。
hdfs dfs -ls /data/incoming/hourly_table/2017-02-01/03
返回信息如下。
Found 8 items
-rw-r-----   2 root hadoop       2252 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/000151.sst
-rw-r-----   2 root hadoop       4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/1.log
-rw-r-----   2 root hadoop       4891 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/2.log
-rw-r-----   2 root hadoop       4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/5.log
-rw-r-----   2 root hadoop       4891 2020-04-17 21:08 /data/incoming/hourly_table/2017-02-01/03/6.log
-rw-r-----   2 root hadoop       4891 2020-04-17 20:42 /data/incoming/hourly_table/2017-02-01/03/OPTIONS-000109
-rw-r-----   2 root hadoop       1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp01.txt
-rw-r-----   2 root hadoop       1016 2020-04-17 20:47 /data/incoming/hourly_table/2017-02-01/03/emp06.txt
執行以下命令,將如下文件夾中的TXT文件合并為不超過10M的文件。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --targetSize=10 --groupBy='.*/([a-z]+).*.txt' --parallelism 20
經過合并后,可以看到兩個TXT文件被合并成了一個文件。
hdfs dfs -ls oss://<yourBucketName>/hourly_table/2017-02-01/03/
Found 1 items
-rw-rw-rw-   1       2032 2020-04-17 21:18 oss://<yourBucketName>/hourly_table/2017-02-01/03/emp2

--enableBalancePlan

在您要拷貝的數據大小均衡、小文件和大文件混合的場景下,因為DistCp默認的執行計劃是隨機進行文件分配的,所以您可以指定--enableBalancePlan來更改Jindo DistCp的作業分配計劃,以達到更好的DistCp性能。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableBalancePlan --parallelism 20
說明 該參數不支持和--groupby--targetSize同時使用。

--enableDynamicPlan

當您要拷貝的數據大小分化嚴重、小文件數據較多的場景下,您可以指定--enableDynamicPlan來更改Jindo DistCp的作業分配計劃,以達到更好的DistCp性能。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableDynamicPlan --parallelism 20
說明 該參數不支持和--groupby--targetSize參數一起使用。

--enableTransaction

--enableTransaction可以保證Job級別的完整性以及保證Job之間的事務支持。示例如下。

jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --enableTransaction --parallelism 20

--diff

DistCp任務完成后,您可以使用--diff查看當前DistCp的文件差異。

例如,執行以下命令,查看/data/incoming/
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --diff
如果全部任務完成則會提示如下信息。
INFO distcp.JindoDistCp: Jindo DistCp job exit with 0
如果src的文件未能同步到dest上,則會在當前目錄下生成manifest文件,您可以使用--copyFromManifest--previousManifest拷貝剩余文件,從而完成數據大小和文件個數的校驗。如果您的DistCp任務包含壓縮或者解壓縮,則--diff不能顯示正確的文件差異,因為壓縮或者解壓縮會改變文件的大小。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --dest oss://<yourBucketName>/hourly_table --previousManifest=file:///opt/manifest-2020-04-17.gz --copyFromManifest --parallelism 20
說明 如果您的--dest為HDFS路徑,目前僅支持/pathhdfs://hostname:ip/pathhdfs://headerIp:ip/path的寫法,暫不支持hdfs:///pathhdfs:/path和其他自定義寫法。
您也可以直接使用--update,增量更新有差異的文件。
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --update --parallelism 20

--queue

您可以使用--queue來指定本次DistCp任務所在Yarn隊列的名稱。

命令示例如下。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --queue yarnqueue

--bandwidth

您可以使用--bandwidth來指定本次DistCp任務單個節點所用的帶寬(以MB為單位),避免占用過大帶寬。

--update

使用--update參數可以一鍵執行增量更新,跳過完全相同的文件和目錄,直接將src中新增或發生改變的文件和目錄同步到dest上。

命令示例如下。
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --update --parallelism 20

--filters

使用--filters參數可以指定一個文件路徑,在這個文件中,一行配置一個正則表達式,對應distcp任務中需要跳過的文件,即用正則表達式過濾不希望參與copy和diff的文件。

命令示例如下。
hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table -filters /path/to/filterfile.txt --parallelism 20
正則表達式示例文件如下:
.*\.tmp.
.*\.staging.*

使用上述示例文件,會匹配hdfs://data/incoming/hourly_tabl下任何帶有.tmp.staging字符串的文件路徑,在copy或diff時排除這些匹配到的路徑不做后續操作。

使用OSS AccessKey

在E-MapReduce外或者免密服務出現問題的情況下,您可以通過指定AccessKey來獲得訪問OSS的權限。您可以在命令中使用--ossKey、--ossSecret、--ossEndPoint選項來指定AccessKey。

命令示例如下。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --ossKey <yourAccessKeyId> --ossSecret <yourAccessKeySecret> --ossEndPoint oss-cn-hangzhou.aliyuncs.com --parallelism 20

本文示例中的yourAccessKeyId是您阿里云賬號的AccessKey ID,yourAccessKeySecret是您阿里云賬號的AccessKey Secret。

使用冷歸檔、歸檔或低頻寫入OSS

在您的Distcp任務寫入OSS時,您可以通過--policy來指定以冷歸檔、歸檔和低頻的模式寫入OSS,進行數據存儲。
  • 使用冷歸檔(--policy coldArchive)示例命令如下。
    hadoop jar jindo-distcp-3.5.0.jar --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy coldArchive --parallelism 20
    說明 冷歸檔存儲類型目前只支持部分地域,詳情請參見存儲類型介紹
  • 使用歸檔(--policy archive)示例命令如下。
    jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy archive --parallelism 20
  • 使用低頻(--policy ia)示例命令如下。
    jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --policy ia --parallelism 20

使用云監控(CloudMonitor)服務監控告警

云監控(CloudMonitor)可以用于收集阿里云資源的監控指標或用戶自定義的監控指標、探測服務可用性、以及針對指標設置警報。使您全面了解阿里云資源的使用情況和業務運行狀況,并及時對故障資源進行處理,保證業務正常運行。

您可以指定本次Jindo DistCp任務結束后是否需要開啟對接云監控(CloudMonitor),上報任務失敗信息,并使用云監控(CloudMonitor)控制臺來配置告警功能。

操作步驟如下:
  1. 創建報警聯系人或報警聯系組,詳情請參見創建報警聯系人或報警聯系組
  2. 獲取報警Token。
    1. 在左側導航欄,選擇報警服務 > 報警聯系人
    2. 報警聯系人頁面,單擊報警聯系組頁簽。
    3. 單擊報警聯系組所在行的接入外部報警

      報警聯系組對話框中,您可以獲取到報警的Token。

  3. 報警聯系組面板,單擊自定義測試命令配置環境變量。
    參數 描述
    cmsAccessKeyId 阿里云賬號的AccessKey ID。
    cmsAccessSecret 阿里云賬號的AccessKey Secret。
    cmsRegion EMR集群所在地域,例如cn-hangzhou。
    cmsToken 步驟2中獲取到的Token。
    cmsLevel 報警級別,支持如下級別:
    • INFO:郵件+釘釘機器人
    • WARN:短信+郵件+釘釘機器人
    • CRITICAL:電話+短信+郵件+釘釘機器人
    示例命令如下:
    export cmsAccessKeyId=<your_key_id>
    export cmsAccessSecret=<your_key_secret>
    export cmsRegion=cn-hangzhou
    export cmsToken=<your_cms_token>
    export cmsLevel=WARN
    
    hadoop jar jindo-distcp-3.5.0.jar \
    --src /data/incoming/hourly_table \
    --dest oss://yang-hhht/hourly_table \
    --enableCMS

清理殘留文件

在您的DistCp任務過程中,由于某種原因在您的目標目錄下,產生未正確上傳的文件,這部分文件通過uploadId的方式由OSS管理,并且對用戶不可見時,您可以通過指定--cleanUpPending選項,指定任務結束時清理殘留文件,或者您也可以通過OSS控制臺進行清理。

命令示例如下。
jindo distcp --src /data/incoming/hourly_table --dest oss://<yourBucketName>/hourly_table --cleanUpPending --parallelism 20

使用s3作為數據源

您可以在命令中使用--s3Key、--s3Secret、--s3EndPoint選項來指定連接s3的相關信息。

代碼示例如下。
jindo distcp jindo-distcp-2.7.3.jar --src s3a://yourbucket/ --dest oss://<your_bucket>/hourly_table --s3Key yourkey --s3Secret yoursecret --s3EndPoint s3-us-west-1.amazonaws.com 
您可以配置s3Key、s3Secret、s3EndPoint在Hadoop的core-site.xml文件里 ,避免每次使用時填寫Accesskey。
<configuration>
    <property>
        <name>fs.s3a.access.key</name>
        <value>xxx</value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <value>xxx</value>
    </property>

    <property>
        <name>fs.s3.endpoint</name>
        <value>s3-us-west-1.amazonaws.com</value>
    </property>
</configuration>
代碼示例如下。
jindo distcp /tmp/jindo-distcp-2.7.3.jar --src s3://smartdata1/ --dest s3://smartdata1/tmp --s3EndPoint  s3-us-west-1.amazonaws.com

查看Distcp Counters

執行以下命令,在MapReduce的Counter信息中查找Distcp Counters的信息。
JindoDistcpCounter
  BYTES_EXPECTED=10000
  BYTES_SKIPPED=10000
  FILES_EXPECTED=11
  FILES_SKIPPED=11
Shuffle Errors
  BAD_ID=0
  CONNECTION=0
  IO_ERROR=0
  WRONG_LENGTH=0
  WRONG_MAP=0
  WRONG_REDUCE=0    
任務計數器 說明
COPY_FAILED Copy失敗的文件數,不為0時告警。
CHECKSUM_DIFF Checksum校驗失敗的文件數,并計入COPY_FAILED。
FILES_EXPECTED 預期的Copy文件數量。
FILES_COPIED Copy成功的文件數。
FILES_SKIPPED update增量更新時跳過的文件數。
BYTES_SKIPPED update增量更新時跳過的字節數。
DIFF_FILES diff比較出的不相同的文件數,不為0時告警。
SAME_FILES 經diff校驗完全相同的文件數。
DST_MISS 目標路徑不存在的文件數,并計入DIFF_FILES。
LENGTH_DIFF 源文件和目標文件大小不一致的數量,并計入DIFF_FILES。
CHECKSUM_DIFF Checksum校驗失敗的文件數,并計入DIFF_FILES。
DIFF_FAILED diff操作異常的文件數,具體報錯參見日志信息。