日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spark訪問Table Store

本文介紹Spark如何消費Table Store的數據。

Spark接入Table Store

準備一張數據表pet,其中name為主鍵。

name

owner

species

sex

birth

death

Fluffy

Harold

cat

f

1993-02-04

-

Claws

Gwen

cat

m

1994-03-17

-

Buffy

Harold

dog

f

1989-05-13

-

Fang

Benny

dog

m

1990-08-27

-

Bowser

Diane

dog

m

1979-08-31

1995-07-29

Chirpy

Gwen

bird

f

1998-09-11

-

Whistler

Gwen

bird

-

1997-12-09

-

Slim

Benny

snake

m

1996-04-29

-

Puffball

Diane

hamster

f

1999-03-30

-

以下示例演示了如何在Spark中消費Table Store的數據。

private static RangeRowQueryCriteria fetchCriteria() {
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}
public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    Configuration hadoopConf = new Configuration();
    JavaSparkContext sc = null;
    try {
        sc = new JavaSparkContext(sparkConf);
        Configuration hadoopConf = new Configuration();
        TableStore.setCredential(
                hadoopConf,
                new Credential(accessKeyId, accessKeySecret, securityToken));
        Endpoint ep = new Endpoint(endpoint, instance);
        TableStore.setEndpoint(hadoopConf, ep);
        TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
                hadoopConf, TableStoreInputFormat.class,
                PrimaryKeyWritable.class, RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        if (sc != null) {
            sc.close();
        }
    }
}

spark-sql訪問Table Store

命令示例如下。

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
  --hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
  --hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
說明
  • 運行代碼示例前必須先配置環境變量。關于如何配置環境變量,請參見配置環境變量

  • /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*中包含TableStore DataSource類型,如果您EMR集群使用的是Spark2,則應該將上面命令中的spark3替換為spark2

  • 針對Spark3,如果查詢Table Store時報錯java.lang.ClassNotFoundException: org.apache.commons.net.util.Base64,則需要添加commons-net依賴,--jars參數值可以改為/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar

  • 如果您不想在每次執行命令時都添加--jars參數,可以在Spark服務的配置頁簽,修改配置項spark.driver.extraClassPathspark.executor.extraClassPath,在配置項的值中添加/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*:/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar的內容

建表和讀取數據示例如下。

create table test_tableStore
using tablestore
  options(endpoint = 'https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
  access.key.id = '${hiveconf:accessKeyId}',
  access.key.secret = '${hiveconf:accessKeySecret}',
  table.name = 'test_table',
  instance.name = 'test_instance',
  catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);

select * from test_tableStore

相關文檔