Spark:DataFrame介绍及使用

发布时间:2025-06-24 17:08:51  作者:北方职教升学中心  阅读量:366


1. DataFrame详解

DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。

  • Row 类型 表示一行数据
    • DataFrame就算是多行构成
# 导入行类Rowfrompyspark.sql importRow# 创建行数据r1 =Row(1,'张三',20)# 行数取取值 按照下标取值data =r1[0]print(data)data1 =r1[1]print(data1)# 指定字段创建行数据r2 =Row(id=2,name='李四',age=22)# 按照字段取值data3 =r2['id']print(data3)data4 =r2['name']print(data4)
  • schema表信息
    • 定义DataFrame中的表的字段名和字段类型。
# 导入数据类型frompyspark.sql.types import*# 定义schema信息# 使用StructType类进行定义# add()方法是指定字段信息# 第一参数,字段名# 第二个参数,字段信息# 第三个参数是否允许为空值  默认是True,允许为空schema_type =StructType().\    add('id',IntegerType()).\    add('name',StringType()).\    add('age',IntegerType(),False)

2. DataFrame创建

创建datafram数据需要使用一个sparksession的类创建,SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext。

2.1 基本创建

#DataFrame 的基本创建#Row就是行数据定义的类frompyspark.sql importRow,SparkSessionfrompyspark.sql.types import*#行数据创建r1 =Row(1,"刘向阳",23,'男')print(r1)#行数据下标取值print(r1[0])print(r1[1])#创建行数据时可以指定字段名r2 =Row(id=2,name='李四',age=20,gender='女')print(r2)#使用字段名取值print(r2['name'])# 定义元数据schema =(StructType().add('id',IntegerType()).add('username',StringType()).add('age',IntegerType()).add('gender',StringType()))print(schema)# 将元数据和行数据放在一起合成DataFramess =SparkSession.builder.getOrCreate()# 调用创建df的方法df =ss.createDataFrame([r1,r2],schema=schema)# 查看df中数据df.show()#查看元数据信息df.printSchema()

运行结果:
在这里插入图片描述

2.2 RDD和DF之间的转化

  • rdd的二维数据转化为DataFrame
    • rdd.toDF()
      在这里插入图片描述
# rdd 和 dataframe的转化frompyspark.sql importSparkSession#创建SparkSession对象ss =SparkSession.builder.getOrCreate()#基于ss对象获取sparkContextsc =ss.sparkContext#创建rdd , 要使用二维列表指定每行数据rdd =sc.parallelize([[1,'张三',20,'男'],[2,'李四',20,'男']])#将rdd转为dfdf =rdd.toDF(schema='id int,name string,age int,gender string')#df数据查看df.show()df.printSchema()#df可以转rddres =df.rdd.collect()print(res)rdd2 =df.rdd.map(lambdax:x['name'])res2 =rdd2.collect()print(res2)

运行结果:
在这里插入图片描述

2.3 pandas和spark之间转化

  • spark的df转为pandas的df
    • toPandas
#pandas 和 spark的dataframe转化frompyspark.sql importSparkSessionimportpandas aspdss =SparkSession.builder.getOrCreate()#创建pandas的dfdf_pd =pd.DataFrame({'id':[1,2,3,4],'name':['张三','李四','王五','赵六'],'age':[1,2,3,4],'gender':['男','女','女','女']})#查看数据print(df_pd)#取值name =df_pd['name'][0]print(name)# 将pandas中的df转为spark的dfdf_spark =ss.createDataFrame(df_pd)#查看df_spark.show()#取值row =df_spark.limit(1).first()print(row['name'])#将spark的df重新转为pandas的dfdf_pandas =df_spark.toPandas()print(df_pandas)

运行结果:
在这里插入图片描述

2.4 读取文件数据转为df

通过read方法读取数据转为df

  • ss.read
#读取文件转为dffrompyspark.sql importSparkSessionss =SparkSession.builder.getOrCreate()#读取不同文件数据转为df# txt文件df =ss.read.text('hdfs://node1:8020/data/students.txt')df.show()# json 文件df_json =ss.read.json('hdfs://node1:8020/data/baike_qa_valid.json')df_json.show()#orc文件df_orc =ss.read.orc('hdfs://node1:8020/data/users.orc')df_orc.show()#去取csv文件#header或csv文件中的第一行作为表头字段数据df_csv =ss.read.csv('hdfs://node1:8020/data/students.csv')df_csv.show()

3. DataFrame基本使用

3.1 SQL语句

使用sparksession提供的sql方法,编写sql语句执行

#使用sql操作dataframe结构化数据frompyspark.sql importSparkSessionss =SparkSession.builder.getOrCreate()#读取文件数据转为dfdf_csv =ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')#使用sql操作df数据#将df指定一个临时表名df_csv.createTempView('stu')#编写sql字符串语句,支持hivesql语法sql_str ="""select * from stu """#执行sql语句,执行结果返回一个新的dfdf_res =ss.sql(sql_str)df_csv.show()df_res.show()

3.2 DSL方法

DSL方法是df提供的数据操作函数
使用方式:

  • df.方法()
  • 可以进行链式调用
  • df.方法().方法().方法()
  • 方法执行后返回一个新的df保存计算结果
  • new_df = df.方法()

spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据。
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法执行完成后会得到一个处理后的新的df

#使用DSL方法操作dataframefrompyspark.sql importSparkSessionss =SparkSession.builder.getOrCreate()#读取文件数据转为dfdf_csv =ss.read.csv('hdfs://node1/data/students.csv',header=True,sep=',')#使用DSL方法对df数据进行操作df2 =df_csv.select('id','name')#查看结果df2.show()#第二种指定字段的方式df3 =df_csv.select(df_csv.age,df_csv.gender)#给字段起别名df4 =df_csv.select(df_csv.age.alias('new_age'),df_csv.gender)df4.show()#修改字段类型df_csv.printSchema()df5 =df_csv.select(df_csv.age.cast('int'),df_csv.gender)df5.printSchema()#where 的数据过滤age =20df6 =df_csv.where(f'age > {age}')df6.show()#过滤年龄大于20并且性别为女性的学生信息df7 =df_csv.where(f'age > 20 and gender = "女" ')df7.show()#使用第二种字段判断方式df8 =df_csv.where(df_csv.age ==age)df8.show()#分组聚合计算df9 =df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age')df9.show()#分组后过滤where 聚合计算时只能一次计算一个聚合数据df10 =df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age').where('sum(age) > 80')df10.show()#排序df11 =df_csv.orderBy('age')#默认排序df11.show()df12 =df_csv.orderBy('age',ascending=False)#降序df12.show()#分页df13 =df_csv.limit(5)df13.show()#转为rddres =df_csv.rdd.collect()[5:10]print(res)df_new =ss.createDataFrame(res)df_new.show()