日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

Python SDK示例:SQL

更新時間:

本文為您介紹Python SDK中執行SQL命令相關的典型場景操作示例。

注意事項

PyODPS支持MaxCompute SQL查詢,并可以讀取執行的結果,使用時有以下注意事項。

  • 入口對象的execute_sql('statement')run_sql('statement')方法可以執行SQL語句,返回值是運行實例,詳情請參見任務實例

  • 目前暫不支持使用Arrow格式讀取Instance結果。

  • 并非所有可以執行的MaxCompute命令都是PyODPS可以接受的SQL語句。在調用非DDL、DML語句時,請使用其他方法,例如:

    • GRANT、REVOKE等語句請使用run_security_query方法。

    • PAI命令請使用run_xflow或execute_xflow方法。

  • 調用SQL引擎執行SQL,會按照SQL作業進行計費,計費詳情請參見計費項與計費方式概述

執行SQL

import os
from odps import ODPS
# 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環境變量設置為用戶 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境變量設置為用戶 Access Key Secret,
# 不建議直接使用 Access Key ID / Access Key Secret 字符串
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)
o.execute_sql('select * from table_name')  #  同步的方式執行,會阻塞直到SQL執行完成。
instance = o.run_sql('select * from table_name')  # 以異步的方式提交
print(instance.get_logview_address())  # 獲取logview地址
instance.wait_for_success()  # 阻塞直到完成

設置運行參數

在運行時如果需要設置參數,您可以通過設置hints參數來實現,參數的類型是dict。

o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

您可以對于全局配置設置sql.settings,后續每次運行時則都會添加相關的運行時參數。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris')  # 會根據全局配置添加hints

讀取SQL執行結果

運行SQL的Instance能夠直接執行open_reader操作讀取SQL執行結果。讀取時會出現以下兩種情況:

  • SQL返回了結構化的數據。

    with o.execute_sql('select * from table_name').open_reader() as reader:
        for record in reader:
            print(record)  # 處理每一個record
  • SQL可能執行了desc命令,這時可以通過reader.raw取到原始的SQL執行結果。

    with o.execute_sql('desc table_name').open_reader() as reader:
        print(reader.raw)

設置使用哪種結果接口

如果您設置了options.tunnel.use_instance_tunnel == True,在后續調用open_reader時,PyODPS會默認調用Instance Tunnel, 否則會調用舊的Result接口。如果您使用了版本較低的 MaxCompute服務,或者調用Instance Tunnel出現了問題,PyODPS會給出告警并自動降級到舊的Result接口,您可根據告警信息判斷導致降級的原因。如果Instance Tunnel的返回結果不合預期, 您可以將該選項設為False,在調用open_reader時,也可以使用tunnel參數來指定使用何種結果接口。

  • 使用Instance Tunnel

    with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
        for record in reader:
            print(record)  # 處理每一個record
  • 使用 Results 接口

    with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
        for record in reader:
            print(record)  # 處理每一個record

設置讀取數據規模

如果您想要限制下載數據的規模,可以為open_reader增加limit選項, 或者設置 options.tunnel.limit_instance_tunnel = True 。如果未設置 options.tunnel.limit_instance_tunnel,MaxCompute會自動打開數據量限制,此時,可下載的數據條數受到Project配置的Tunnel下載數據規模數限制, 通常該限制為10000條。

設置讀取結果為pandas DataFrame

# 直接使用 reader 的 to_pandas 方法
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
    # pd_df 類型為 pandas DataFrame
    pd_df = reader.to_pandas()

設置讀取速度(進程數)

說明

多進程加速僅在 PyODPS 0.11.3 及以上版本中支持。

您可以通過n_process指定使用進程數。

import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
    # n_process 指定成機器核數
    pd_df = reader.to_pandas(n_process=n_process)

設置alias

在運行SQL時,如果某個UDF引用的資源是動態變化的,您可以alias舊的資源名到新的資源,這樣就可以避免重新刪除并重新創建UDF。

from odps.models import Schema

myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# 寫入一行數據,只包含一個值1
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把內容為1的資源alias成內容為2的資源,不需要修改UDF或資源
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])

在交互式環境執行 SQL

在ipython和jupyter里支持使用SQL插件的方式運行SQL,且支持參數化查詢, 詳情參考交互體驗增強文檔

設置biz_id

在少數情形下,在提交SQL時,可能需要同時提交biz_id,否則執行會報錯。此時,您可以在全局options里設置biz_id。

from odps import options

options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')