通過(guò)JDBC連接Spark Thrift Server提交Spark作業(yè)
本文介紹通過(guò)JDBC連接Spark Thrift Servert并成功提交Spark作業(yè)。
前提條件
連接Spark Thrift Server需要校驗(yàn)用戶名和密碼,請(qǐng)進(jìn)行用戶認(rèn)證配置,請(qǐng)參見(jiàn):用戶管理
DDI集群Spark Thrift Server默認(rèn)端口號(hào)為10001,請(qǐng)確認(rèn)成功添加安全組白名單,請(qǐng)參見(jiàn):安全組白名單
背景信息
JDBC連接Spark Thrift Server如下:
Beeline:通過(guò)HiveServer2的JDBC客戶端進(jìn)行連接。
Java:編寫(xiě)Java代碼進(jìn)行連接。
Python:編寫(xiě)Python代碼進(jìn)行連接。
Beeline客戶端連接Spark Thrift Server
執(zhí)行如下命令,進(jìn)入Beeline客戶端。
beeline
返回如下信息
Beeline version 2.3.7 by Apache Hive
執(zhí)行如下命令,連接Spark Thrift Servert。
!connect jdbc:hive2://{ddi-header-ip}:10001/{db_name}
輸入用戶名和密碼。
Enter username for jdbc:hive2://ip:10001/beijing_dlf_db_test: username
Enter password for jdbc:hive2://ip:10001/beijing_dlf_db_test: ********
查詢數(shù)據(jù),返回結(jié)果如下:
0: jdbc:hive2://ip:10001/beijing_d> select * from table_name limit 10;
+---------+-------------+
| action | date |
+---------+-------------+
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Close | 2016-07-27 |
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
+---------+-------------+
10 rows selected (15.853 seconds)
Java代碼連接Spark Thrift Server
在執(zhí)行本操作前,確保您已安裝Java環(huán)境和Java編程工具,并且已配置環(huán)境變量
Java代碼連接Spark Thrift Server需要下載Databricks提供的依賴包,下載路徑:Databricks JDBC Driver
將項(xiàng)目依賴SparkJDBC42.jar添加到編程工具的Lib下,如圖:
編寫(xiě)代碼,連接Spark Thrift Server并提交作業(yè)。
代碼如下:
import com.simba.spark.jdbc.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class SparkJDBC {
private static Connection connectViaDS() throws Exception {
Connection connection = null;
Class.forName("com.simba.spark.jdbc.Driver");
DataSource ds = new com.simba.spark.jdbc.DataSource();
ds.setURL("jdbc:spark://{ip}:10001/{db_name};AuthMech=3;UID=username;PWD=ps");
connection = ds.getConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection connection = connectViaDS();
System.out.println("日志打印:"+connection.getClientInfo());
PreparedStatement tables = connection.prepareStatement("show databases ");
ResultSet res = tables.executeQuery();
while (res.next()){
System.out.println("database_name :"+ res.getString(1));
}
res.close();
tables.close();
connection.close();
}
}
本地測(cè)試是否正常執(zhí)行。
執(zhí)行代碼過(guò)程中如果出現(xiàn)NoSuchFileException請(qǐng)聯(lián)系DDI運(yùn)維人員。
Python代碼連接Spark Thrift Server
使用PyHive連接Spark Thrift Server需要依的賴包安裝如下:
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
編寫(xiě)Python代碼,連接Spark Thrift Server并提交作業(yè)。
代碼如下:vim sparkJDBC.py
from pyhive import hive
conn = hive.Connection(host='host',database='db_name', port=10001, username='username',password='ps',auth="LDAP")
cursor=conn.cursor()
cursor.execute('select * from table_name limit 10')
for result in cursor.fetchall():print(result)
conn.close()
執(zhí)行Python代碼結(jié)果如下:
python sparkJDBC.py
執(zhí)行結(jié)果:
('Close', '2016-07-27')
('Open', '2016-07-27')
('Close', '2016-07-27')
('Close', '2016-07-27')
('Open', '2016-07-27')
('Close', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')