本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
您可以使用TableRecordDataset接口按照行讀取MaxComepute表數據并構建數據流。
TensorFlow社區推薦在1.2及以上版本,使用Dataset接口代替線程和隊列構建數據流。通過多個Dataset接口的組合變換生成計算數據,可以簡化數據輸入代碼。
公共云GPU服務器即將過保下線,您可以繼續提交CPU版本的TensorFlow任務。如需使用GPU進行模型訓練,請前往DLC提交任務,具體操作請參見創建訓練任務。
接口說明
PAI-TF提供的TableRecordDataset與原生TensorFlow RecordDataset相似,可以為數據變換(Transformation)的Dataset接口提供數據源。TableRecordDataset的接口定義如下。
class TableRecordDataset(Dataset):
def __init__(self,
filenames,
record_defaults,
selected_cols=None,
excluded_cols=None,
slice_id=0,
slice_count=1,
num_threads=0,
capacity=0):
參數 | 描述 |
filenames | 待讀取的表名集合(列表),同一張表可以重復讀取。 |
record_defaults | 待讀取列的數據類型或列為空時的默認數據類型。如果該類型與實際讀取的列類型不符,或數據類型無法自動轉換,則執行過程中系統會拋出異常。系統支持的數據類型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING。 |
selected_cols | 選取的列,格式為英文逗號(,)分隔的字符串。 |
excluded_cols | 排除的列,格式為英文逗號(,)分隔的字符串。不能同時使用excluded_cols和selected_cols。 |
slice_id | 當前分區的編號。分布式讀取時,系統根據slice_count將表平均分為多個分區,讀取slice_id對應的分區。 |
slice_count | 分布式讀取時,總的分區數量,通常為Worker數量。 |
num_threads | 預取數據時,每個訪問表的內置Reader啟用的線程(獨立于計算線程)數量。取值范圍為1~64。如果num_threads取值為0,則系統自動將新建的預取線程數配置為計算線程池線程數的1/4。 說明 因為I/O對每個模型的整體計算影響不同,所以提高預取線程數,不一定可以提升整體模型的訓練速度。 |
capacity | 讀取表的總預取量,單位為行數。如果num_threads大于1,則每個線程的預取量為capacity/num_threads行(向上取整)。如果capacity為0,則內置Reader根據所讀表的前N行(系統默認N=256)平均值自動配置總預取量,使得每個線程的預取數據約占空間64 MB。 說明 如果手動配置預取量,當單線程的預取量大于1 GB,系統僅輸出告警信息以提示您檢查配置,而不會中斷程序運行。 |
如果MaxCompute表字段為DOUBLE類型,則TensorFlow中需要使用np.float格式與其對應。
返回值
TableRecordDataset返回一個新的Dataset對象,可以作為Pipeline工作流構建的輸入。
### other definition codes was ignored here.
# Suppose an odps table named 'sample_table' was built in
# 'test' project, which includes 5 columns:
# (itemid bigint, name string, price double,
# virtual bool, tags string)
# Table name would be passed from run commands.
tables = ["odps://test/tables/sample_table"]
# Firstly, we define a new TableRecordDataset to read itemid and price.
dataset = tf.data.TableRecordDataset(tables,
record_defaults = (0, 0.0),
selected_cols = "itemid, price")
# Get a batch of 128
dataset = dataset.batch(128)
# Set epoch as 10
dataset = dataset.repeat(10)
# At last we got a batch of ids and prices.
[ids, prices] = dataset.make_one_shot_iterator().get_next()
### Then we do other graph construction and finally run the graph.
執行Session時調用get_next()
方法,從表中讀取128行數據,并根據record_defaults指定的類型將每列數據解析為對應類型的Tensor。其中get_next()
返回的output_types需要與record_defaults的參數類型相同,output_shapes的Tensor Shape需要與record_defaults的元素數量一致。
Console參數
如果將表作為輸入,提交任務時,需要使用-Dtables配置待訪問的表名。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample;
如果讀取2張以上的表,則需要使用英文逗號(,)分隔多個表名。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample,odps://algo_platform_dev/tables/sample2
如果訪問分區表,則需要在表名后添加分區。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample/pt=1;
示例
以邏輯回歸(Logistic Regression)為例,介紹如何使用TableRecordDataset讀取表數據并進行模型訓練。
數據準備。
TableRecordReader是將整行數據作為一個字符串導入MaxCompute表,讀取之后再進行解析。而使用TableRecordDataset時,建議MaxCompute數據表按照列存放相應的數據,Dataset接口會將表中的數據以指定類型的Tensor返回。
創建表。
使用MaxCompute創建一個包含四列數據的表。
odps@ algo_platform_dev>create table sample (col1 double, col2 double, col3 double, col4 double); Data Health Manager:Your health synthesize score is 5, so, your job priority is 7 ID = 201803050245351****6Vgsxo2 OK odps@ algo_platform_dev>read sample; +------------+------------+------------+------------+ | col1 | col2 | col3 | col4 | +------------+------------+------------+------------+ +------------+------------+------------+------------+
導入數據。
下載測試數據,并使用MaxCompute Console Tunnel命令將其導入MaxCompute表。
#查看下載的測試數據。 $head -n 3 sample.csv 0,0,0.017179100152531324,1 0,1,0.823381420409002,1 0,2,1.6488850495540865,1
#將數據導入MaxCompute表。 odps@ algo_platform_dev>tunnel upload /tmp/data/sample.csv sample -fd=,; Upload session: 20180305135640c8cc650a0000**** Start upload:sample.csv Using \n to split records Upload in strict schema mode: true Total bytes:260093 Split input to 1 blocks 2018-03-05 13:56:40 scan block: '1' 2018-03-05 13:56:40 scan block complete, blockid=1 2018-03-05 13:56:40 upload block: '1' 2018-03-05 13:56:41 upload block complete, blockid=1 upload complete, average speed is 254 KB/s OK odps@ algo_platform_dev>read sample 3; +------------+------------+------------+------------+ | col1 | col2 | col3 | col4 | +------------+------------+------------+------------+ | 0.0 | 0.0 | 0.017179100152531324 | 1.0 | | 0.0 | 1.0 | 0.823381420409002 | 1.0 | | 0.0 | 2.0 | 1.6488850495540865 | 1.0 | +------------+------------+------------+------------+
說明因為該測試數據的每行內容使用英文逗號(,)分隔,所以使用
-fd=,
配置分隔符為英文逗號(,)才能將每行數據分為四列導入至相應的MaxCompute表。
構建輸入數據和模型。
構建輸入數據的示例代碼如下。除無需定義tf.train.Coordinator和運行start_queue_runners以外,其余代碼與使用TableRecordReader的代碼相同。
#define the input def input_fn(): dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128) v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next() labels = tf.reshape(tf.cast(v4, tf.int32), [128]) features = tf.stack([v1, v2, v3], axis=1) return features, labels
完整的示例代碼lr_dataset.py如下。
import tensorflow as tf tf.app.flags.DEFINE_string("tables", "", "tables info") FLAGS = tf.app.flags.FLAGS #define the input def input_fn(): dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128) v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next() labels = tf.reshape(tf.cast(v4, tf.int32), [128]) features = tf.stack([v1, v2, v3], axis=1) return features, labels #construct the model def model_fn(features, labels): W = tf.Variable(tf.zeros([3, 2])) b = tf.Variable(tf.zeros([2])) pred = tf.matmul(features, W) + b loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=pred,labels=labels)) # Gradient Descent optimizer = tf.train.GradientDescentOptimizer(0.05).minimize(loss) return loss, optimizer features, labels = input_fn() loss, optimizer = model_fn(features, labels) init = tf.global_variables_initializer() local_init = tf.local_variables_initializer() sess = tf.Session() sess.run(init) sess.run(local_init) for step in range(10000): _, c = sess.run([optimizer, loss]) if step % 2000 == 0: print("loss," , c)
提交任務。
odps@ algo_platform_dev>pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample -Dscript=file:///tmp/lr_dataset.py;
查看執行結果。
單擊提交任務返回的Logview鏈接,查看執行結果。
start launching tensorflow job ('loss,', 0.6931472) ('loss,', 0.007929571) ('loss,', 0.0016527221) ('loss,', 0.0023481336) ('loss,', 0.0011788738)