本文介紹Spark如何訪問MySQL。
Spark RDD訪問MySQL
示例代碼如下。
val input = getSparkContext.textFile(inputPath, numPartitions)
input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _)
.mapPartitions(e => {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = s"insert into $tbName(word, count) values (?, ?)"
try {
conn = DriverManager.getConnection(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", dbUser, dbPwd)
ps = conn.prepareStatement(sql)
e.foreach(pair => {
ps.setString(1, pair._1)
ps.setLong(2, pair._2)
ps.executeUpdate()
})
ps.close()
conn.close()
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
Iterator.empty
}).count()
spark-sql訪問MySQL
訪問命令如下。
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,mysql-connector-java-8.0.30.jar
說明
mysql-connector-java-8.0.30.jar
包含了MySQL JDBC Driver,請根據實際地址填寫MySQL Driver的版本以及路徑。/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
中包含JDBC2 DataSource類型。如果您EMR集群使用的是Spark2,則應修改上面命令中的spark3
為spark2
。
建表和讀取數據示例如下。
//建表
create table test1(id int)
using jdbc2
options(
url="jdbc:mysql://mysql_url/test_db?user=root&password=root",
dbtable="test1",
driver="com.mysql.jdbc.Driver");
//讀取MySQL
select * from test1;
//寫入MySQL
insert into test1 values(1);
相關文檔
文檔內容是否對您有幫助?