日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

TableRecordDataset

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

您可以使用TableRecordDataset接口按照行讀取MaxComepute表數據并構建數據流。

TensorFlow社區推薦在1.2及以上版本,使用Dataset接口代替線程和隊列構建數據流。通過多個Dataset接口的組合變換生成計算數據,可以簡化數據輸入代碼。

警告

公共云GPU服務器即將過保下線,您可以繼續提交CPU版本的TensorFlow任務。如需使用GPU進行模型訓練,請前往DLC提交任務,具體操作請參見創建訓練任務

接口說明

PAI-TF提供的TableRecordDataset與原生TensorFlow RecordDataset相似,可以為數據變換(Transformation)的Dataset接口提供數據源。TableRecordDataset的接口定義如下。

class TableRecordDataset(Dataset):
  def __init__(self,
               filenames,
               record_defaults,
               selected_cols=None,
               excluded_cols=None,
                slice_id=0,
                slice_count=1,
               num_threads=0,
               capacity=0):

參數

描述

filenames

待讀取的表名集合(列表),同一張表可以重復讀取。

record_defaults

待讀取列的數據類型或列為空時的默認數據類型。如果該類型與實際讀取的列類型不符,或數據類型無法自動轉換,則執行過程中系統會拋出異常。系統支持的數據類型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING。

selected_cols

選取的列,格式為英文逗號(,)分隔的字符串。

excluded_cols

排除的列,格式為英文逗號(,)分隔的字符串。不能同時使用excluded_colsselected_cols

slice_id

當前分區的編號。分布式讀取時,系統根據slice_count將表平均分為多個分區,讀取slice_id對應的分區。

slice_count

分布式讀取時,總的分區數量,通常為Worker數量。

num_threads

預取數據時,每個訪問表的內置Reader啟用的線程(獨立于計算線程)數量。取值范圍為1~64。如果num_threads取值為0,則系統自動將新建的預取線程數配置為計算線程池線程數的1/4。

說明

因為I/O對每個模型的整體計算影響不同,所以提高預取線程數,不一定可以提升整體模型的訓練速度。

capacity

讀取表的總預取量,單位為行數。如果num_threads大于1,則每個線程的預取量為capacity/num_threads行(向上取整)。如果capacity0,則內置Reader根據所讀表的前N行(系統默認N=256)平均值自動配置總預取量,使得每個線程的預取數據約占空間64 MB。

說明

如果手動配置預取量,當單線程的預取量大于1 GB,系統僅輸出告警信息以提示您檢查配置,而不會中斷程序運行。

說明

如果MaxCompute表字段為DOUBLE類型,則TensorFlow中需要使用np.float格式與其對應。

返回值

TableRecordDataset返回一個新的Dataset對象,可以作為Pipeline工作流構建的輸入。

### other definition codes was ignored here.

# Suppose an odps table named 'sample_table' was built in
# 'test' project, which includes 5 columns:
#   (itemid bigint, name string, price double,
#    virtual bool, tags string)

# Table name would be passed from run commands.
tables = ["odps://test/tables/sample_table"]

# Firstly, we define a new TableRecordDataset to read itemid and price.
dataset = tf.data.TableRecordDataset(tables,
                                     record_defaults = (0, 0.0),
                                     selected_cols = "itemid, price")
# Get a batch of 128
dataset = dataset.batch(128)
# Set epoch as 10
dataset = dataset.repeat(10)
# At last we got a batch of ids and prices.
[ids, prices] = dataset.make_one_shot_iterator().get_next()

### Then we do other graph construction and finally run the graph.

執行Session時調用get_next()方法,從表中讀取128行數據,并根據record_defaults指定的類型將每列數據解析為對應類型的Tensor。其中get_next()返回的output_types需要與record_defaults的參數類型相同,output_shapes的Tensor Shape需要與record_defaults的元素數量一致。

Console參數

  • 如果將表作為輸入,提交任務時,需要使用-Dtables配置待訪問的表名。

    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample;
  • 如果讀取2張以上的表,則需要使用英文逗號(,)分隔多個表名。

    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample,odps://algo_platform_dev/tables/sample2
  • 如果訪問分區表,則需要在表名后添加分區。

    pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample/pt=1;

示例

以邏輯回歸(Logistic Regression)為例,介紹如何使用TableRecordDataset讀取表數據并進行模型訓練。

  1. 數據準備。

    TableRecordReader是將整行數據作為一個字符串導入MaxCompute表,讀取之后再進行解析。而使用TableRecordDataset時,建議MaxCompute數據表按照列存放相應的數據,Dataset接口會將表中的數據以指定類型的Tensor返回。

    1. 創建表。

      使用MaxCompute創建一個包含四列數據的表。

      odps@ algo_platform_dev>create table sample (col1 double, col2 double, col3 double, col4 double);
      Data Health Manager:Your health synthesize score is 5, so, your job priority is 7
      
      ID = 201803050245351****6Vgsxo2
      OK
      odps@ algo_platform_dev>read sample;
      +------------+------------+------------+------------+
      | col1       | col2       | col3       | col4       |
      +------------+------------+------------+------------+
      +------------+------------+------------+------------+
    2. 導入數據。

      下載測試數據,并使用MaxCompute Console Tunnel命令將其導入MaxCompute表。

      #查看下載的測試數據。
      $head -n 3 sample.csv
      0,0,0.017179100152531324,1
      0,1,0.823381420409002,1
      0,2,1.6488850495540865,1
      #將數據導入MaxCompute表。
      odps@ algo_platform_dev>tunnel upload /tmp/data/sample.csv sample -fd=,;
      Upload session: 20180305135640c8cc650a0000****
      Start upload:sample.csv
      Using \n to split records
      Upload in strict schema mode: true
      Total bytes:260093   Split input to 1 blocks
      2018-03-05 13:56:40 scan block: '1'
      2018-03-05 13:56:40 scan block complete, blockid=1
      2018-03-05 13:56:40 upload block: '1'
      2018-03-05 13:56:41 upload block complete, blockid=1
      upload complete, average speed is 254 KB/s
      OK
      odps@ algo_platform_dev>read sample 3;
      +------------+------------+------------+------------+
      | col1       | col2       | col3       | col4       |
      +------------+------------+------------+------------+
      | 0.0        | 0.0        | 0.017179100152531324 | 1.0        |
      | 0.0        | 1.0        | 0.823381420409002 | 1.0        |
      | 0.0        | 2.0        | 1.6488850495540865 | 1.0        |
      +------------+------------+------------+------------+
      說明

      因為該測試數據的每行內容使用英文逗號(,)分隔,所以使用-fd=,配置分隔符為英文逗號(,)才能將每行數據分為四列導入至相應的MaxCompute表。

  2. 構建輸入數據和模型。

    構建輸入數據的示例代碼如下。除無需定義tf.train.Coordinator和運行start_queue_runners以外,其余代碼與使用TableRecordReader的代碼相同。

    #define the input
    def input_fn():
        dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128)
        v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next()
        labels = tf.reshape(tf.cast(v4, tf.int32), [128])
        features = tf.stack([v1, v2, v3], axis=1)
        return features, labels

    完整的示例代碼lr_dataset.py如下。

    import tensorflow as tf
    
    tf.app.flags.DEFINE_string("tables", "", "tables info")
    FLAGS = tf.app.flags.FLAGS
    
    #define the input
    def input_fn():
        dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128)
        v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next()
        labels = tf.reshape(tf.cast(v4, tf.int32), [128])
        features = tf.stack([v1, v2, v3], axis=1)
        return features, labels
    
    #construct the model
    def model_fn(features, labels):
        W = tf.Variable(tf.zeros([3, 2]))
        b = tf.Variable(tf.zeros([2]))
        pred = tf.matmul(features, W) + b
    
        loss =  tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=pred,labels=labels))
    
        # Gradient Descent
        optimizer = tf.train.GradientDescentOptimizer(0.05).minimize(loss)
        return loss, optimizer
    
    features, labels = input_fn()
    loss, optimizer = model_fn(features, labels)
    
    init = tf.global_variables_initializer()
    local_init = tf.local_variables_initializer()
    
    sess = tf.Session()
    sess.run(init)
    sess.run(local_init)
    
    for step in range(10000):
        _, c = sess.run([optimizer, loss])
        if step % 2000 == 0:
            print("loss," , c)
  3. 提交任務。

    odps@ algo_platform_dev>pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample -Dscript=file:///tmp/lr_dataset.py;
  4. 查看執行結果。

    單擊提交任務返回的Logview鏈接,查看執行結果。

    start launching tensorflow job
    ('loss,', 0.6931472)
    ('loss,', 0.007929571)
    ('loss,', 0.0016527221)
    ('loss,', 0.0023481336)
    ('loss,', 0.0011788738)