日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

Python SDK

更新時間:

PythonSDK

安裝

快速安裝

$ sudo pip install pydatahub

源碼安裝

$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install

常見問題

1.如果安裝過程中出現錯誤信息’Python.h: No such file or directory’,常用的操作系統安裝方式如下

$ sudo apt-get install python-dev   # for python2.x installs
$ sudo apt-get install python3-dev  # for python3.x installs
$ sudo yum install python-devel   # for python2.x installs
$ sudo yum install python3-devel   # for python3 installs

2.如果使用windows操作系統,根據提示信息可到 此處 下載安裝對應版本的 Visual C++ SDK。Windows 10 安裝cprotobuf依賴時如果報類似如下錯誤,也表示需要安裝Visual C++ 生成工具:

bulding 'cprotobuf.internal' extention 
error: [WinError2] The system cannot find the file specified

推薦使用python3.6或以上,會明確提示所需版本及鏈接信息。

3.Windows 下如果安裝依賴時報類似如下錯誤,是環境問題所致,請搜索相關錯誤,根據具體情況,拷貝所需文件,或是直接使用 developer command prompt 工具進行安裝:

LINK : fatal error LNK1158: cannot run 'rc.exe'

4.Windows 7 如果提示如下錯誤,可安裝此 build tools

error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/

安裝驗證

$ python -c "from datahub import DataHub"

如果上述命令執行成功,恭喜你安裝DataHub Python版本SDK成功!

基本概念

詳見: 名詞解釋

準備工作

  • 訪問DataHub服務需要使用阿里云認證賬號,需要提供阿里云accessId及accessKey。 同時需要提供訪問的服務地址。

  • 創建Project

  • 初始化DataHub

import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)

Project操作

  • 創建示例

project_name = 'project'
comment = 'comment'
try:
    dh.create_project(project_name, comment)
    print("create project success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("project already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Topic操作

Tuple Topic

  • Tuple類型Topic寫入的數據是有格式的,需要指定Record Schema,目前支持以下幾種數據類型:

類型

含義

值域

Bigint

8字節有符號整型。請不要使用整型的最小值 (-9223372036854775808),這是系統保留值。

-9223372036854775807 ~ 9223372036854775807

String

字符串,只支持UTF-8編碼。

單個String列最長允許1MB。

Boolean

布爾型。

可以表示為True/False,true/false, 0/1

Double

8字節雙精度浮點數。

-1.0 10308 ~ 1.0 10308

TimeStamp

時間戳類型

表示到微秒的時間戳類型

  • 創建示例

topic_name = "tuple_topic"
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
    ['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
    [FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
    dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
    print("create tuple topic success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

Blob Topic

  • Blob類型Topic支持寫入一塊二進制數據作為一個Record,數據將會以BASE64編碼傳輸。

topic_name = "blob_topic"
shard_count = 3
life_cycle = 7
try:
    dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
    print("create blob topic success!")
    print("=======================================\n\n")
except ResourceExistException:
    print("topic already exist!")
    print("=======================================\n\n")
except Exception as e:
    print(traceback.format_exc())
    sys.exit(-1)

數據發布/訂閱

獲取Shard列表

  • list_shards接口獲取topic下的所有shard

shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))

返回結果是一個ListShardResult對象,包含一個Shard對象的list,list中的每個元素是一個shard,可以獲取shard_id,state狀態,begin_hash_key,end_hash_key等信息

發布數據

  • put_records接口向一個topic發布數據

put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)

其中傳入參數records是一個List對象,每個元素為一個record,但是必須為相同類型的record,即Tuple類型或者Blob類型,返回結果為PutRecordsResult對象,包含failed_record_count和failed_records成員,failed_records是一個FailedRecord對象的list,FailedRecord對象包含成員index,error_code和error_message

  • 寫入Tuple類型Record示例

try:
    # block等待所有shard狀態ready
    dh.wait_shards_ready(project_name, topic_name)
    print("shards all ready!!!")
    print("=======================================\n\n")
    topic_result = dh.get_topic(project_name, topic_name)
    print(topic_result)
    if topic_result.record_type != RecordType.TUPLE:
        print("topic type illegal!")
        sys.exit(-1)
    print("=======================================\n\n")
    record_schema = topic_result.record_schema
    records0 = []
    record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
    record0.shard_id = '0'
    record0.put_attribute('AK', '47')
    records0.append(record0)
    record1 = TupleRecord(schema=record_schema)
    record1.set_value('bigint_field', 2)
    record1.set_value('string_field', 'yc2')
    record1.set_value('double_field', None)
    record1.set_value('bool_field', False)
    record1.set_value('time_field', 1455869335000011)
    record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
    records0.append(record1)
    record2 = TupleRecord(schema=record_schema)
    record2.set_value(0, 3)
    record2.set_value(1, 'yc3')
    record2.set_value(2,  1.1)
    record2.set_value(3, False)
    record2.set_value(4, 1455869335000011)
    record2.attributes = {'key': 'value'}
    record2.partition_key = 'TestPartitionKey'
    records0.append(record2)
    put_result = dh.put_records(project_name, topic_name, records0)
    print(put_result)
    print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
    # failed_record_count如果大于0最好對failed record再進行重試
    print("=======================================\n\n")
except DatahubException as e:
    print(e)
    sys.exit(-1)
  • 寫入BLOB類型Record示例

try:
    records1 = []
    record3 = BlobRecord(blob_data='data')
    record3.shard_id = '0'
    record3.put_attribute('a', 'b')
    records1.append(record3)
    put_result = dh.put_records(project_name, topic_name, records1)
    print(put_result)
except DatahubException as e:
    print(e)
    sys.exit(-1)

獲取cursor

  • 獲取Cursor,可以通過三種方式獲取:OLDEST, LATEST, SYSTEM_TIME

    • OLDEST: 表示獲取的cursor指向當前有效數據中時間最久遠的record

    • LATEST: 表示獲取的cursor指向當前最新的record

    • SYSTEM_TIME: 表示獲取的cursor指向大于等于該時間(單位毫秒)的第一條record

shard_id = '0'
time_stamp = 0
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor

通過get_cursor接口獲取用于讀取指定位置之后數據的cursor

訂閱數據

  • 從指定shard讀取數據,需要指定從哪個Cursor開始讀,并指定讀取的上限數據條數,如果從Cursor到shard結尾少于Limit條數的數據,則返回實際的條數的數據。

project_name = 'project'
shard_id = "0"
limit = 10
# 讀取blob topic的record
topic_name = 'blob_topic'
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# 讀取tuple topic的record
topic_name = 'tuple_topic'
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
  • 消費Tuple類型Record示例

    try:
      # block等待所有shard狀態ready
      dh.wait_shards_ready(project_name, topic_name)
      print("shards all ready!!!")
      print("=======================================\n\n")
      topic_result = dh.get_topic(project_name, topic_name)
      print(topic_result)
      if topic_result.record_type != RecordType.TUPLE:
          print("topic type illegal!")
          sys.exit(-1)
      print("=======================================\n\n")
      shard_id = '0'
      limit = 10
      cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
      cursor = cursor_result.cursor
      while True:
          get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
          for record in get_result.records:
              print(record)
          if 0 == get_result.record_count:
              time.sleep(1)
          cursor = get_result.next_cursor
    except DatahubException as e:
      print(e)
      sys.exit(-1)

    結尾

  • Python API Doc

  • Python Package Index

  • GitHub地址