SparkSQL

前言

Spark SQL踩坑

SparkSQL就是将大量的数据加载到内存中(本身这些数据都是规则的,相当于MySQL数据库中的表一样),然后用SQL语句进行查询,查询的结果也可以再导入MySQL

准备工作

安装MySQL,我最开始是在Windows上安装的MySQL,Windows和Linux虚拟机能ping通,无奈,程序跑起来Linux上没有权限访问Windows上的数据库,试了很久没搞定,于是就在Linux上装MySQL,和Spark程序在一台节点上

准备一些数据

1
2
3
4
5
6
7
8
9
10
1 小明 22
2 小红 21
3 小张 25
4 小王 19
5 小陈 17
6 zc 18
7 pang 39
8 yuan 28
9 小文 10
10 liu 12

有规则的数据,上传数据到HDFS上

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
//创建样例类Student
case class Student(id: Int, name: String, age: Int)
object SparkSqlToMysql {
def main(args: Array[String]): Unit = {
//1、创建sparkSession对象
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlToMysql")
.getOrCreate()
//2、读取数据
val data: RDD[String] = spark.sparkContext.textFile("hdfs://hadoop01:9000/student")
//3、切分每一行,
val arrRDD: RDD[Array[String]] = data.map(_.split(" "))
//4、RDD关联Student
val studentRDD: RDD[Student] = arrRDD.map(x => Student(x(0).toInt, x(1), x(2).toInt))
//导入隐式转换
import spark.implicits._
//5、将RDD转换成DataFrame
val studentDF: DataFrame = studentRDD.toDF()
//6、将DataFrame注册成表
studentDF.createOrReplaceTempView("student")
//7、操作student表 ,按照年龄进行降序排列
val resultDF: DataFrame = spark.sql("select * from student order by age desc")
//8、把结果保存在mysql表中
//创建Properties对象,配置连接mysql的用户名和密码
val prop = new Properties()
prop.setProperty("user", "root")
prop.setProperty("password", "123456")
resultDF.write.jdbc("jdbc:mysql://hadoop01:3306/spark?useUnicode=true&characterEncoding=UTF-8", "student", prop)
//写入mysql时,可以配置插入mode,overwrite覆盖,append追加,ignore忽略,error默认表存在报错
//resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)
spark.stop()
}
}

函数textFile的功能特别强大,文件路径支持通配符,比如你要一次性导入文件夹,或者多个文件只需要

1
textFile(/xxx/*/*)

就可以全部导入RDD中

打包运行踩坑

运行,指定MySQL连接驱动包

1
./spark-submit --class demo03.SparkSqlToMysql --executor-memory 500m --jars /root/Desktop/mysql-connector-java-5.1.38.jar --driver-class-path /root/Desktop/mysql-connector-java-5.1.38.jar /root/Desktop/scalaDemo.jar

也可以运行spark-shell

1
./spark-shell --executor-memory 500m --jars /root/Desktop/mysql-connector-java-5.1.38.jar --driver-class-path /root/Desktop/mysql-connector-java-5.1.38.jar

通过shell直接操作