日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MaxCompute使用

在PAI子產品(DLC或DSW)中,您可以通過阿里云MaxCompute提供的PyODPS或人工智能平臺PAI自主研發的paiio,實現MaxCompute數據的讀寫操作。針對不同的應用場景,您可以選擇合適的MaxCompute數據讀取方式。

功能介紹

  • PyODPS

    PyODPS是阿里云MaxCompute的Python版本的SDK,提供簡單方便的Python編程接口。您可以使用PyODPS完成上傳和下載文件、創建表、運行ODPS SQL查詢等操作。詳情請參見PyODPS概述

  • paiio

    為了在PAI子產品中方便地讀寫MaxCompute表數據,PAI團隊自主研發了paiio模塊。paiio支持以下三種接口:

    接口

    區別

    功能描述

    TableRecordDataset

    依賴TensorFlow框架,推薦在1.2及以上版本中使用Dataset接口(詳情請參見Dataset),替代原有的線程和隊列接口構建數據流。

    讀取MaxCompute表數據。

    TableReader

    不依賴TensorFlow框架,基于MaxCompute SDK實現,可以直接訪問MaxCompute表并即時獲取I/O結果。

    讀取MaxCompute表數據。

    TableWriter

    不依賴TensorFlow框架,基于MaxCompute SDK實現,可以直接對MaxCompute表進行寫入并返回。

    往MaxCompute表中寫入數據。

前提條件

使用限制

paiio模塊不支持使用自定義鏡像,僅選擇TensorFlow 1.12、1.15和2.0版本對應的鏡像時,才可使用paiio模塊。

PyODPS

您可以使用PyODPS實現MaxCompute數據的讀寫操作。

  1. 執行如下命令,安裝PyODPS。

    pip install pyodps
  2. 執行如下命令檢查安裝是否成功。若無返回值和報錯信息表示安裝成功。

    python -c "from odps import ODPS"
  3. 如果您使用的Python不是系統默認的Python版本,安裝完PIP后,您可以執行如下命令進行Python版本切換。

    /home/tops/bin/python3.7 -m pip install setuptools>=3.0
    #/home/tops/bin/python3.7為安裝的python路徑
  4. 通過PyODPS讀寫MaxCompute數據。

    import numpy as np
    import pandas as pd
    import os
    
    from odps import ODPS
    from odps.df import DataFrame
    # 建立鏈接。
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # 以直接指定字段名以及字段類型的方式創建非分區表my_new_table。
    
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # 向非分區表my_new_table中插入數據。
    records = [[111, 'aaa'],
              [222, 'bbb'],
              [333, 'ccc'],
              [444, '中文']]
    o.write_table(table, records)
    
    # 讀取數據。
    sql = '''
    SELECT  
        *
    FROM
        your-default-project.<table>
    LIMIT 100
    ;
    '''
    query_job = o.execute_sql(sql)
    result = query_job.open_reader(tunnel=True)
    df = result.to_pandas(n_process=1) # n_process配置可參考機器配置,取值大于1時可以開啟多線程加速。
    

    其中:

    • ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET:需將該環境變量設置為您的阿里云賬號的AccessKey ID和 AccessKey Secret。

      說明

      不建議直接使用AccessKey ID和AccessKey Secret字符串。

    • your-default-projectyour-end-point:需替換為您設置的默認項目名稱與Endpoint信息,各地域的Endpoint請參見Endpoint

    更多關于如何使用PyODPS對MaxCompute表進行其他操作,詳情請參見

paiio

準備工作:配置賬戶信息

使用paiio模塊讀寫MaxCompute表數據之前,需要配置MaxCompute賬戶的AccessKey信息。PAI支持從配置文件讀取配置信息,您可以將配置文件放置在掛載的文件系統中,然后在代碼中通過環境變量引用。

  1. 編寫配置文件,內容如下。

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    參數

    描述

    access_id

    阿里云賬號的AccessKey ID。

    access_key

    阿里云賬號的AccessKey Secret。

    end_point

    MaxCompute的Endpoint,例如華東2(上海)配置為http://service.cn-shanghai.maxcompute.aliyun.com/api。詳情請參見Endpoint

  2. 在代碼中指定配置文件路徑,方式如下。

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    其中<your MaxCompute config file path>表示配置文件的路徑。

TableRecordDataset使用說明

接口說明

TensorFlow社區推薦在1.2及以上版本中使用Dataset接口(詳情請參見Dataset)替代原有的線程和隊列接口構建數據流。通過多個Dataset接口的組合變換生成計算數據,可以簡化數據輸入部分的代碼。

  • 接口定義(Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • 參數

    參數

    是否必選

    類型

    默認值

    描述

    filenames

    STRING

    待讀取的表名集合(列表),多張表的Schema必須一致。表名格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

    record_defaults

    LIST或TUPLE

    用于讀取列的數據類型轉換及列為空時的默認值。如果該值與實際讀取的列數不符,或數據類型無法自動轉換,則執行過程中系統會拋出異常。

    系統支持的數據類型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING,INT64類型的默認值請參見np.array(0, np.int64)

    selected_cols

    STRING

    None

    選取的列,格式為半角逗號(,)分隔的字符串。默認值None表示讀取所有列。該參數與excluded_cols不能同時使用。

    excluded_cols

    STRING

    None

    排除的列,格式為半角逗號(,)分隔的字符串。默認值None表示讀取所有列。該參數與selected_cols不能同時使用。

    slice_id

    INT

    0

    在分布式讀取場景下,當前分片的編號(從0開始編號)。分布式讀取時,系統根據slice_count將表均分為多個分片,讀取slice_id對應的分片。

    slice_id為默認值0時,如果slice_count取值為1,則表示讀取整張表。如果slice_count大于1,則表示讀取第0個分片。

    slice_count

    INT

    1

    在分布式讀取場景下,總的分片數量,通常為Worker數量。默認值1表示不分片,即讀取整張表,

    num_threads

    INT

    0

    預取數據時,每個訪問表的內置Reader啟用的線程(獨立于計算線程)數量。取值范圍為1~64。如果num_threads取值為0,則系統自動將新建的預取線程數配置為計算線程池線程數的1/4。

    說明

    因為I/O對每個模型的整體計算影響不同,所以提高預取線程數,不一定可以提升整體模型的訓練速度。

    capacity

    INT

    0

    讀取表的總預取量,單位為行數。如果num_threads大于1,則每個線程的預取量為capacity/num_threads行(向上取整)。如果capacity0,則內置Reader根據所讀表的前N行(系統默認N=256)平均值自動配置總預取量,使得每個線程的預取數據約占空間64 MB。

    說明

    如果MaxCompute表字段為DOUBLE類型,則TensorFlow中需要使用np.float64格式與其對應。

  • 返回值

    返回Dataset對象,可以作為Pipeline工作流構建的輸入。

使用示例

假設在myproject項目中存儲了一張名為test的表,其部分內容如下所示。

itemid(BIGINT)

name(STRING)

price(DOUBLE)

virtual(BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

以下代碼實現了使用TableRecordDataset接口讀取test表itemid和price列的數據。

import os
import tensorflow as tf
import paiio

# 指定配置文件路徑。請替換為配置文件實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 定義要讀取的Table, 可以是多個。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
table = ["odps://${your_projectname}/tables/${table_name}"]
# 定義TableRecordDataset, 讀取表的itemid和price列。
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# 設置epoch 2, batch size 3, prefetch 100 batch。
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader使用說明

接口說明

TableReader基于MaxCompute SDK實現,不依賴TensorFlow框架,可以直接訪問MaxCompute表并即時獲取I/O結果。

  • 創建Reader并打開表

    • 接口定義

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • 參數

    • 參數

      是否必選

      類型

      默認值

      描述

      table

      STRING

      需要打開的MaxCompute表名,格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      STRING

      空字符串("")

      選取的列,格式為英文逗號(,)分隔的字符串。默認值空字符串("")表示讀取所有列。該參數與excluded_cols不能同時使用。

      excluded_cols

      STRING

      空字符串("")

      排除的列,格式為英文逗號(,)分隔的字符串。默認值空字符串("")表示讀取所有列。該參數與selected_cols不能同時使用。

      slice_id

      INT

      0

      在分布式讀取場景下,當前分片的編號,取值范圍為[0, slice_count-1]。分布式讀取時,系統根據slice_count將表均分為多個分片,讀取slice_id對應的分片。默認值0表示不分片,即讀取表的所有行。

      slice_count

      INT

      1

      在分布式讀取場景下,總的分片數量,通常為Worker數量。

    • 返回值

      Reader對象。

  • 讀取記錄

    • 接口定義

    • reader.read(num_records=1)
    • 參數

      num_records表示順序讀取的行數。默認值為1,即讀取1行。如果num_records參數超出未讀的行數,則返回讀取到的所有行。如果未讀取到記錄,則拋出OutOfRange異常(paiio.python_io.OutOfRangeException)。

    • 返回值

      返回一個numpy ndarray數組(或稱為recarray),數組中每個元素為表的一行數據組成的一個TUPLE。

  • 定位到相應行

    • 接口定義

    • reader.seek(offset=0)
    • 參數

    • offset表示定位到的行(行從0開始編號),下一個Read操作將從定位的行開始。如果配置了slice_idslice_count,則按分片位置進行相對行的定位。如果offset超出表的總行數,則系統拋出OutOfRange異常。如果之前的讀取位置已經超出表尾,則繼續進行seek系統會拋出OutOfRange異常(paiio.python_io.OutOfRangeException)。

      重要

      讀取一個batch_size時,如果剩余行數不足一個batch_size,則read操作會返回剩余行且不拋異常。此時,如果繼續進行seek操作,則系統會拋異常。

    • 返回值

      無返回值。如果操作出錯,則系統拋出異常。

  • 獲取表的總記錄數

    • 接口定義

    • reader.get_row_count()
    • 參數

    • 返回值

      返回表的行數。如果配置了slice_idslice_count,則返回分片大小。

  • 獲取表的Schema

    • 接口定義

    • reader.get_schema()
    • 參數

    • 返回值

    • 返回1D-stuctured ndarray,每個元素對應reader中選定的MaxCompute表中一列的Schema,包括如下三個元素。

      參數

      描述

      colname

      列名。

      typestr

      MaxCompute數據類型名稱。

      pytype

      typestr對應的Python數據類型。

      typestrpytype的對應關系如下表所示。

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      說明

      PAI-TensorFlow不支持對MAP類型數據進行操作。

      OBJECT

  • 關閉表

    • 接口定義

    • reader.close()
    • 參數

    • 返回值

      無返回值。如果操作出錯,則系統拋出異常。

使用示例

假設在myproject項目中存儲了一張名為test的表,其內容如下所示。

uid(BIGINT)

name(STRING)

price(DOUBLE)

virtual(BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

以下代碼實現了使用TableReader讀取uidnameprice列的數據。

    import os
    import paiio
    
    # 指定配置文件路徑。請替換為配置文件實際存放的路徑。
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # 打開一張表,返回reader對象。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # 獲得表的總行數。
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # 讀表,返回值將是一個recarray數組,形式為[(uid, name, price)*2]。
    records = reader.read(batch_size) # 返回[(25, "Apple", 5.0), (38, "Pear", 4.5)]
    records = reader.read(batch_size) # 返回[(17, "Watermelon", 2.2)]
    # 繼續讀取將拋出OutOfRange異常。
    
    # Close the reader.
    reader.close()

TableWriter使用說明

TableWriter基于MaxCompute SDK實現,不依賴TensorFlow框架,可以直接對MaxCompute表進行寫入并返回。

接口說明

  • 創建Writer并打開表

    • 接口定義

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      說明
      • 該接口不會清空原表中的數據,采用追加的方式寫入數據。

      • 對于新寫入的數據,關閉表之后才能對其進行讀取。

    • 參數

      參數

      是否必選

      類型

      默認值

      描述

      table

      STRING

      待打開的MaxCompute表名,格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      INT

      0

      在分布式場景,寫表至不同的分片,從而避免寫沖突。在單機場景,使用默認值0即可。在多機場景,如果多個Worker(包括PS)同時使用同一個slice_id寫表,則會導致寫入失敗。

    • 返回值

      返回Writer對象。

  • 寫入記錄

    • 接口定義

      writer.write(values, indices)
    • 參數

      參數

      是否必選

      類型

      默認值

      描述

      values

      STRING

      待寫入的數據。支持寫入單行數據或多行數據:

      • 如果僅寫入單行數據,則向values參數傳入一個由標量組成的TUPLE、LIST或1D-ndarray。如果傳入的是LIST或ndarray,則說明寫入的各列數據類型一致。

      • 如果寫入N行數據(N>=1),可以向values參數傳入一個LIST或1D-ndarray,參數中的每個元素都應該對應一個單行的數據(用TUPLE或LIST表示,也可以通過Structure形式存放于ndarray中)。

      indices

      INT

      指定數據寫入的列,支持傳入由INT類型Index組成的TUPLE、LIST或1D-ndarray。indices中每個數(i)對應表中相應的第i列(列數從0開始編號)。

    • 返回值

      無返回值。如果寫過程出錯,則系統會拋出異常并退出。

  • 關閉表

    • 接口定義

      writer.close()
      說明

      在with語句的區塊中,無需顯示調用close()接口關閉表。

    • 參數

    • 返回值

      無返回值。如果操作出錯,則系統拋出異常。

    • 示例

      通過with語句使用TableWriter,代碼如下。

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, incides)
          # Table would be closed automatically outside this section.

使用示例

import paiio
import os

# 指定配置文件路徑。請替換為配置文件實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 準備數據。
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# 打開一個表,返回writer對象。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write records to the 0-3 columns of the table. 將數據寫至表中的第0-3列。
records = writer.write(values, indices=[0, 1, 2, 3])

# 關閉writer。
writer.close()