这样看起来就像一张表了

发布时间:2025-06-24 17:39:43  作者:北方职教升学中心  阅读量:885



-总结:
DataFrame 就是一个分布式的表;
DataFrame = RDD - 泛型 + SQL 的操作 + 优化。DataSet 的区别

1.结构图解:
-RDD[Person]:
以 Person 为类型参数,但不了解 其内部结构。DataSet 的区别

  • 4) 总结
  • 6. Spark SQL 应用
    • 1) 创建 DataFrame/DataSet
    • 2) 两种查询风格:DSL 和 SQL
    • 3) Spark SQL 完成 WordCount
    • 4) Spark SQL 多数据源交互
  • 1. 数据分析方式

    1 ) 命令式
    在前面的 RDD 部分, 非常明显可以感觉的到是命令式的, 主要特征是通过一个算子, 可以得到一个结果, 通过结果再进行后续计算。

    1) 创建 DataFrame/DataSet

    -读取文本文件:
    1.在本地创建一个文件,有 id、
    而 Spark 出现了以后,统一了两种数据处理范式是一种革新性的进步。
    与 RDD 相比,保存了更多的描述信息,概念上等同于关系型数据库中的二维表。
    在这里插入图片描述

    4. 数据分类和 SparkSQL 适用场景

    1) 结构化数据

    一般指数据有固定的 Schema(约束),例如在用户表中,name 字段是 String 型,那么每一条数据的 name 字段值都可以当作 String 来使用:
    在这里插入图片描述

    2) 半结构化数据

    般指的是数据没有固定的 Schema,但是数据本身是有结构的。
    新的问题:
    Shark 执行计划的生成严重依赖 Hive,想要增加新的优化非常困难;
    Hive 是进程级别的并行,Spark 是线程级别的并行,所以 Hive 中很多线程不安全的代码不适用于 Spark;
    由于以上问题,Shark 维护了 Hive 的一个分支,并且无法合并进主线,难以为继;
    在 2014 年 7 月 1 日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上。
    SparkSQL-Dataset
    SparkSQL 在 1.6 时代,增加了一个新的 API,叫做 Dataset,Dataset 统一和结合了 SQL 的访问和命令式 API 的使用,这是一个划时代的进步。

    3. Hive 和 SparkSQL

    Hive 是将 SQL 转为 MapReduce。

    vim /root/person.txt1 zhangsan 202 lisi 293 wangwu 254 zhaoliu 305 tianqi 356 kobe 40

    2.打开 spark-shell

    spark/bin/spark-shell

    创建 RDD

    val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]

    3.定义 case class(相当于表的 schema)

    case class Person(id:Int, name:String, age:Int)

    4.将 RDD 和 case class 关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
    5.将 RDD 转换成 DataFrame

    val personDF = personRDD.toDF //DataFrame

    6.查看数据和 schema

    personDF.show+---+--------+---+| id|    name|age|+---+--------+---+|  1|zhangsan| 20||  2|    lisi| 29||  3|  wangwu| 25||  4| zhaoliu| 30||  5|  tianqi| 35||  6|    kobe| 40|+---+--------+---+personDF.printSchema

    7.注册表

    personDF.createOrReplaceTempView("t_person")

    8.执行 SQL

    spark.sql("select id,name from t_person where id > 3").show

    9.也可以通过 SparkSession 构建 DataFrame

    val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")

    dataFrame.show //注意:直接读取的文本文件没有完整schema信息
    dataFrame.printSchema
    -读取 json 文件:

    val jsonDF= spark.read.json("file:///resources/people.json")

    接下来就可以使用 DataFrame 的函数操作
    jsonDF.show
    注意:直接读取 json 文件有 schema 信息,因为 json 文件本身含有 Schema 信息,SparkSQL 可以自动解析。结构化;
    SparkSQL 主要用于处理结构化数据(较为规范的半结构化数据也可以处理)。
    Spark2.0 中两者统一,DataFrame 表示为 DataSet[Row],即 DataSet 的子集。
    在 Spark 出现之前,对于结构化数据的查询和处理, 一个工具一向只能支持 SQL 或者命令式,使用者被迫要使用多个工具来适应两种场景,并且多个工具配合起来比较费劲。
    -没有固定 Schema
    指的是半结构化数据是没有固定的 Schema 的,可以理解为没有显式指定 Schema。这样看起来就像一张表了。
    HiveContext 通过 hive sql 语句操作 hive 表数据,兼容 hive 操作,hiveContext 继承自 SQLContext。

    3) 总结

    -数据分类总结:
    -定义 特点 举例
    在这里插入图片描述

    -Spark 处理什么样的数据?
    RDD 主要用于处理非结构化数据 、

    3 ) 总结
    SQL 擅长数据分析和通过简单的语法表示查询,命令式操作适合过程式处理和算法性的处理。name、age 三列,用空格分隔,然后上传到 hdfs 上。
    SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行。
    -总结:
    SparkSQL 是一个既支持 SQL 又支持命令式数据处理的工具;
    SparkSQL 的主要适用场景是处理结构化数据(较为规范的半结构化数据也可以处理)。

    2. SparkSQL 前世今生

    SQL 是数据分析领域一个非常重要的范式,所以 Spark 一直想要支持这种范式,而伴随着一些决策失误,这个过程其实还是非常曲折的。
    比如说一个用户信息的 JSON 文件,
    第 1 条数据的 phone_num 有可能是数字,
    第 2 条数据的 phone_num 虽说应该也是数字,但是如果指定为 String,也是可以的,
    因为没有指定 Schema,没有显式的强制的约束。
    例如 JSON 文件,其中的某一条数据是有字段这个概念的,每个字段也有类型的概念,所以说 JSON 是可以描述自身的,也就是数据本身携带有元信息。

    2 ) SQL
    对于一些数据科学家/数据库管理员/DBA, 要求他们为了做一个非常简单的查询, 写一大堆代码, 明显是一件非常残忍的事情, 所以 SQL on Hadoop 是一个非常重要的方向。
    在 Dataset 中可以轻易的做到使用 SQL 查询并且筛选数据,然后使用命令式 API 进行探索式分析。

    5. Spark SQL 数据抽象

    1) DataFrame

    -什么是 DataFrame
    DataFrame 的前身是 SchemaRDD,从 Spark 1.3.0 开始 SchemaRDD 更名为 DataFrame。
    2.命令式的缺点
    需要一定的代码功底;
    写起来比较麻烦。

    sc.textFile("...")  .flatMap(_.split(" "))  .map((_, 1))  .reduceByKey(_ + _)  .collect()

    1.命令式的优点
    操作粒度更细,能够控制数据的每一个处理环节;
    操作更明确,步骤更清晰,容易维护;
    支持半/非结构化数据的操作。
    调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!
    DataSet 包含了 DataFrame 的功能。DataFrame、

    SELECT   name,   age,   schoolFROM studentsWHERE age > 10

    1.SQL 的优点
    表达非常清晰, 比如说这段 SQL 明显就是为了查询三个字段,条件是查询年龄大于 10 岁的。

    2.数据图解:
    -假设 RDD 中的两行数据长这样:

    • RDD[Person]:
      -那么 DataFrame 中的数据长这样:
      DataFrame = RDD[Person] - 泛型 + Schema + SQL 操作 + 优化:
      -那么 Dataset 中的数据长这样:
      Dataset[Person] = DataFrame + 泛型:
      -Dataset 也可能长这样:Dataset[Row]:
      即 DataFrame = DataSet[Row]:

    4) 总结

    DataFrame = RDD - 泛型 + Schema + SQL + 优化
    DataSet = DataFrame + 泛型
    DataSet = RDD + Schema + SQL + 优化

    6. Spark SQL 应用

    -在 spark2.0 版本之前
    SQLContext 是创建 DataFrame 和执行 SQL 的入口。
    如果想使用 SQL 风格的语法,需要将 DataFrame 注册成表,采用如下的方式:

    personDF.createOrReplaceTempView("t_person")spark.sql("select * from t_person").show

    1.显示表的描述信息

    spark.sql("desc t_person").show

    2.查询年龄最大的前两名

    spark.sql("select * from t_person order by age desc limit 2").show

    3.查询年龄大于 30 的人的信息

    spark.sql("select * from t_person where age > 30 ").show

    4.使用 SQL 风格完成 DSL 中的需求

    spark.sql("select name, age + 1 from t_person").showspark.sql("select name, age from t_person where age > 25").showspark.sql("select count(age) from t_person where age > 30").showspark.sql("select age, count(age) from t_person group by age").show

    -总结:
    1.DataFrame 和 DataSet 都可以通过 RDD 来进行创建;
    2.也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过 RDD+Schema;
    3.通过 josn/parquet 会有完整的约束;
    4.不管是 DataFrame 还是 DataSet 都可以注册成表,之后就可以使用 SQL 进行查询了! 也可以使用 DSL!

    3) Spark SQL 完成 WordCount

    -SQL 风格:

    import org.apache.spark.SparkContextimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCount {  def main(args: Array[String]): Unit = {    //1.创建SparkSession    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()    val sc: SparkContext = spark.sparkContext    sc.setLogLevel("WARN")    //2.读取文件    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")    //fileDF.show()    //fileDS.show()    //3.对每一行按照空格进行切分并压平    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String    import spark.implicits._    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String    //wordDS.show()    /*    +-----+    |value|    +-----+    |hello|    |   me|    |hello|    |  you|      ...     */    //4.对上面的数据进行WordCount    wordDS.createOrReplaceTempView("t_word")    val sql =      """        |select value ,count(value) as count        |from t_word        |group by value        |order by count desc      """.stripMargin    spark.sql(sql).show()    sc.stop()    spark.stop()  }}

    -DSL 风格:

    import org.apache.spark.SparkContextimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCount2 {  def main(args: Array[String]): Unit = {    //1.创建SparkSession    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()    val sc: SparkContext = spark.sparkContext    sc.setLogLevel("WARN")    //2.读取文件    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")    //fileDF.show()    //fileDS.show()    //3.对每一行按照空格进行切分并压平    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String    import spark.implicits._    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String    //wordDS.show()    /*    +-----+    |value|    +-----+    |hello|    |   me|    |hello|    |  you|      ...     */    //4.对上面的数据进行WordCount    wordDS.groupBy("value").count().orderBy($"count".desc).show()    sc.stop()    spark.stop()  }}

    4) Spark SQL 多数据源交互

    -读数据:
    读取 json 文件:

    spark.read.json("D:\\data\\output\\json").show()

    读取 csv 文件:

    spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()

    读取 parquet 文件:

    spark.read.parquet("D:\\data\\output\\parquet").show()

    读取 mysql 表:

    val prop = new Properties()    prop.setProperty("user","root")    prop.setProperty("password","root")spark.read.jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
    • 写数据:
      写入 json 文件:
    personDF.write.json("D:\\data\\output\\json")

    写入 csv 文件:

    personDF.write.csv("D:\\data\\output\\csv")

    写入 parquet 文件:

    personDF.write.parquet("D:\\data\\output\\parquet")

    写入 mysql 表:

    val prop = new Properties()    prop.setProperty("user","root")    prop.setProperty("password","root")personDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)