日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

PyODPS支持對MaxCompute SQL的基本操作,本文為您介紹如何在PyODPS中使用SQL。

背景信息

PyODPS提供對MaxCompute SQL的基本操作方法,方法如下所示。

方法名稱

方法說明

execute_sql()/run_sql()

執行SQL語句

open_reader()

讀取SQL執行結果

說明

在MaxCompute客戶端中可以執行的SQL語句并非都可以通過入口對象的execute_sql()run_sql()方法執行。在調用非DDL或非DML語句時,請使用其他方法。例如,調用GRANT或REVOKE語句時,請使用run_security_query方法;調用API命令時,請使用run_xflowexecute_xflow方法。

在Python UDF編寫過程中,如果某個UDF引用的資源是動態變化的,您可以在execute_sql()中設置alias給舊的資源一個別名作為新的資源,無需重新刪除或創建新的UDF。詳情請參見設置alias

執行SQL語句

PyODPS對MaxCompute SQL操作的具體說明如下。

  • 參數說明

    • statement:需要執行的SQL語句。

    • hints:設置運行時參數,參數類型是DICT。

  • 返回值說明

    執行execute_sql()run_sql()后的返回值是任務實例。詳情請參見任務實例

  • 使用示例

    • 示例1

      執行SQL語句。

      o.execute_sql('select * from table_name')  #同步的方式執行,會阻塞直到SQL語句執行完成。
      instance = o.run_sql('select * from table_name')  #異步的方式執行。
      print(instance.get_logview_address())  # 獲取Logview地址。
      instance.wait_for_success()  # 阻塞直到完成。
    • 示例2

      執行SQL語句時,運行參數。

      o.execute_sql('select * from pyodps_iris', hints={'odps.stage.mapper.split.size': 16})

      您也可以通過如下示例,設置sql.settings,對運行的參數進行全局配置,則在每次運行語句時都會執行對應參數,支持設置的全局參數請參見Flag參數列表

      from odps import options
      options.sql.settings = {'odps.stage.mapper.split.size': 16}
      o.execute_sql('select * from pyodps_iris')  # 會根據全局配置添加hints。

讀取SQL執行結果

您可以通過open_reader操作讀取SQL執行結果。有以下兩種情況:

  • 讀取表數據,返回結構化數據,通過for語句遍歷即可。

    with o.execute_sql('select * from table_name').open_reader() as reader:
        for record in reader:   # 處理每一個record。
            print(record)
  • 執行desc等命令,返回非結構化數據,需要通過reader.raw獲取執行結果。

    with o.execute_sql('desc table_name').open_reader() as reader:
        print(reader.raw)

在調用open_reader()時,PyODPS會默認調用舊的Result接口,可能會出現獲取數據超時或獲取數據受限等問題。您可以按照如下方法指定PyODPS調用Instance Tunnel。

  • 在腳本中設置options.tunnel.use_instance_tunnel =True

  • 按照如下示例,設置open_reader(tunnel=True)。從PyODPS v0.7.7.1開始,您可以通過open_reader()方法讀取全量數據。

    with o.execute_sql('select * from table_name').open_reader(tunnel=True) as reader:
        for record in reader:
            print(record)
說明
  • 如果您使用了較低版本的MaxCompute服務,或者調用Instance Tunnel出現了問題,PyODPS會給出警告并自動降級到舊的Result接口,您可根據警告信息判斷導致降級的原因。

  • 如果您使用的MaxCompute只能支持舊Result接口,并且需要讀取所有的數據,您可將SQL結果寫入另一張表后用讀表接口讀取(可能受到Project安全設置的限制)。

  • 更多Instance Tunnel說明,請參見Instance tunnel

PyODPS默認不限制從Instance讀取的數據規模,但Project Owner可能在MaxCompute Project上增加保護設置,以限制對Instance結果的讀取,此時只能使用受限讀取模式讀取數據,在此模式下可讀取的行數受到Project配置限制,通常為10000行。如果PyODPS檢測到讀取Instance數據被限制,且options.tunnel.limit_instance_tunnel未設置,會自動啟用受限讀取模式。

  • 如果您的Project被保護,想要手動啟用受限讀取模式,可以為open_reader()方法增加limit=True參數,例如open_reader(limit=True)。或者設置options.tunnel.limit_instance_tunnel = True

  • 在部分環境中(例如DataWorks),options.tunnel.limit_instance_tunnel可能默認被置為True,此時,如果想要讀取所有數據,需要為open_reader()方法增加tunnel=Truelimit=False參數,例如open_reader(tunnel=True, limit=False)

  • 重要

    如果Project本身被保護,tunnel=Truelimit=False選項不能解除保護,此時應聯系Project Owner開放相應的讀權限。

設置alias

如果某個UDF引用的資源是動態變化的,您可以通過設置alias給舊的資源一個別名作為新的資源,無需重新刪除或創建新的UDF。

from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# 寫入一行數據,只包含一個值1。
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把內容為1的資源別名設置成內容為2的資源。您不需要修改UDF或資源。
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])

在少數情形下,提交SQL時需要同時提交biz_id,否則執行會報錯。此時,您可以通過設置全局options里的biz_id解決此類報錯。

from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')