本文介紹Spark如何寫入數據至Hbase。
Spark訪問HBase示例
重要 計算集群需要和HBase集群處于一個安全組內,否則網絡無法打通。在E-Mapreduce控制臺創建計算集群時,請選擇HBase集群所在的安全組。
- Java代碼
JavaSparkContext jsc = new JavaSparkContext(sparkConf); try { List<byte[]> list = new ArrayList<>(); list.add(Bytes.toBytes("1")); ... list.add(Bytes.toBytes("5")); JavaRDD<byte[]> rdd = jsc.parallelize(list); Configuration conf = HBaseConfiguration.create(); JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); hbaseContext.foreachPartition(rdd, new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() { public void call(Tuple2<Iterator<byte[]>, Connection> t) throws Exception { Table table = t._2().getTable(TableName.valueOf(tableName)); BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); while (t._1().hasNext()) { byte[] b = t._1().next(); Result r = table.get(new Get(b)); if (r.getExists()) { mutator.mutate(new Put(b)); } } mutator.flush(); mutator.close(); table.close(); } }); } finally { jsc.stop(); }
- Scala代碼
val sc = new SparkContext("local", "test") val config = new HBaseConfiguration() ... val hbaseContext = new HBaseContext(sc, config) rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) it.foreach((putRecord) => { . val put = new Put(putRecord._1) . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) . bufferedMutator.mutate(put) }) bufferedMutator.flush() bufferedMutator.close() })
相關文檔
- 關于Spark訪問EMR-HBase的更多操作,請參見hbase-connectors。
- 關于HBase更多介紹,請參見HBase and Spark。