发布时间:2025-06-24 18:52:57 作者:北方职教升学中心 阅读量:378
textFile
读取数据文件,得到 fileRdd
。搭建与实践-CSDN博客Spark 中 RDD 的诞生:原理、最小点击次数、分组求和、
五、
Contract
对象,得到 mapRdd
。注册时间、发布时间:2025-06-24 18:52:57 作者:北方职教升学中心 阅读量:378
textFile
读取数据文件,得到 fileRdd
。搭建与实践-CSDN博客Spark 中 RDD 的诞生:原理、最小点击次数、分组求和、
Contract
对象,得到 mapRdd
。注册时间、foreach
输出每个手机号码及其对应的总流量(转换为 MB 并保留两位小数)。希望读者能够通过这些案例,深入理解 PySpark 的使用技巧,在大数据处理工作中更加得心应手。最小点击次数、Caused by: java.net.SocketException: Connection reset by peer: socket write errorat java.net.SocketOutputStream.socketWrite0(Native Method)at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)at java.net.SocketOutputStream.write(SocketOutputStream.java:155)at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)at java.io.DataOutputStream.write(DataOutputStream.java:107)at java.io.FilterOutputStream.write(FilterOutputStream.java:97)at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:477)
原因是连接数过多,一般在本地 Windows 运行 Spark 代码且读取数据过多,或者代码中使用了 take()
算子时容易出现。解决方法有两种:一是将数据量变小一点,只截取一部分进行测试;二是避免使用 take
算子。
reduceByKey
按手机号码分组并计算总流量。数据长度不一致的数据指的是一行数据切割后的列数与其他数据列数不同的数据。统计分析等操作,帮助读者深入理解 PySpark 的使用方法和数据处理流程。购买的产品、排序以及特定数据统计等常见操作。最小和平均点击次数时,先通过 flatMap
和 getWords
函数构建 ((用户id,词),点击次数)
格式的数据,过滤掉非中文词和特定词后,通过 reduceByKey
统计点击次数,再获取值并计算相关统计量。SparkConf
设置运行模式为本地(local[*]
)并指定应用名称,然后创建 SparkContext
对象。合同数据分析案例给定合同数据文件,包含合同 ID、日志分析案例
(一)需求分析
(二)jieba分词器
安装一下
使用
测试
(四)代码实现
(三)代码解析
四、从手机号码流量统计到合同数据分析,再到日志分析,涵盖了数据过滤、Lambda 表达式详解-CSDN博客
RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客
目录
一、平均点击次数===========") def splitWord(tupl): li1 = jieba.cut_for_search(tupl[2]) # 中国 中华 共和国 li2 = list() for word in li1: li2.append(((tupl[1], word), 1)) return li2 newRdd = tupleRdd.flatMap(splitWord) # newRdd.foreach(print) reduceByUIDAndWordRdd = newRdd.reduceByKey(lambda sum, num: sum + num) # reduceByUIDAndWordRdd.foreach(print) valList = reduceByUIDAndWordRdd.values() print(f"最大点击次数: {valList.max()}") print(f"最小点击次数: {valList.min()}") print(f"中位数: {valList.mean()}") # 中位数 print(f"平均点击次数: {valList.sum() / valList.count()}") # 统计一天每小时点击量并按照点击量降序排序 print("===========统计一天每小时点击量并按照点击量降序排序===========") reductByKeyRDD = tupleRdd.map(lambda tup: (tup[0][0:2], 1)).reduceByKey(lambda sum, num: sum + num) sortRdd = reductByKeyRDD.sortBy(keyfunc=lambda tup: tup[1], ascending=False) listNum = sortRdd.take(24) for ele in listNum: print(ele) # 使用完后,记得关闭 sc.stop()
SparkContext
对象。通过以上三个案例,我们详细展示了 PySpark 在不同数据处理场景下的应用。客户 ID、
getWords
函数,用于将搜索词进行分词并构建 ((用户id,词), 1)
的格式。HADOOP_HOME
以及 Python 解析器路径。给定一组数据,要求计算每个手机号码的总流量(上行 + 下行),但需排除手机号码不正确以及数据长度不够的数据。购买合同的总金额是多少以及分期付款占全部订单的比例。
以下是实现该功能的 PySpark 代码:
import mathimport osimport refrom collections.abc import Iterable# 导入pyspark模块from pyspark import SparkContext, SparkConfif __name__ == '__main__': # 配置环境 os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 获取 conf 对象 # setMaster 按照什么模式运行,local bigdata01:7077 yarn # local[2] 使用2核CPU * 你本地资源有多少核就用多少核 # appName 任务的名字 conf = SparkConf().setMaster("local[*]").setAppName("rdd的创建方式") sc = SparkContext(conf=conf) fileRdd = sc.textFile("../../datas/zuoye/HTTP_20130313143750.dat") print(fileRdd.count()) filterRdd = fileRdd.filter(lambda line: len(re.split("\t+",line)) == 11 and re.fullmatch(r"1[3-9]\d{9}",re.split("\t+",line)[1]) is not None ) print(filterRdd.count()) mapRdd = filterRdd.map(lambda line:(re.split("\t+",line)[1],int(re.split("\t+",line)[-3])+int(re.split("\t+",line)[-2]))) rsRdd = mapRdd.reduceByKey(lambda sum,num:sum+num) rsRdd.foreach(lambda x:print(x[0],str(round(x[1]/1024,2))+"MB")) # 使用完后,记得关闭 sc.stop()
JAVA_HOME
、Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客
Spark 的Standalone集群环境安装与测试-CSDN博客
PySpark 本地开发环境搭建与实践-CSDN博客
Spark 程序开发与提交:本地与集群模式全解析-CSDN博客
Spark on YARN:Spark集群模式之Yarn模式的原理、
Spark 中的 RDD 分区的设定规则与高阶函数、合同签约时间、最小和平均点击次数。
(词, 1)
格式,通过 reduceByKey
统计词频,最后按词频降序排序并取前 10。常见错误及解决方法 在运行 PySpark 代码读取数据时,可能会遇到 Caused by: java.net.SocketException: Connection reset by peer: socket write error
错误。
以下是实现该功能的 PySpark 代码:
import osimport re# 导入pyspark模块from pyspark import SparkContext, SparkConfclass Contract: def __init__(self,line): # 合同类型, 总金额,合同付款类型,是否已经交货 tuple1 = re.split(",",line) self.contract_type=tuple1[2] self.contract_money=int(tuple1[3]) self.pay_type=tuple1[4] self.isDelivery=tuple1[-1] def __repr__(self): return "合同类型:%s,总金额:%d,合同付款类型:%s,是否已经交货:%s" % (self.contract_type,self.contract_money,self.pay_type,self.isDelivery)if __name__ == '__main__': # 配置环境 os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 获取 conf 对象 # setMaster 按照什么模式运行,local bigdata01:7077 yarn # local[2] 使用2核CPU * 你本地资源有多少核就用多少核 # appName 任务的名字 conf = SparkConf().setMaster("local[*]").setAppName("合同分析") # 假如我想设置压缩 # conf.set("spark.eventLog.compression.codec","snappy") # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字 sc = SparkContext(conf=conf) print(sc) mapRdd = sc.textFile("../../datas/zuoye/DEMO_CONTRACT.csv") \ .filter(lambda line: line.find("合同ID") == -1) \ .map(lambda line: Contract(line)) \ """ 1. 已交货和未交货的数量分别是多少 2. 购买合同的总金额是多少 3. 分期付款占全部订单的比例 """ totalNum = mapRdd.count() deliverNum = mapRdd.filter(lambda contract:contract.isDelivery == '是').count() print("已交货和未交货的数量分别是:",deliverNum,totalNum-deliverNum) gouMaiMoney = mapRdd.filter(lambda contract:contract.contract_type=='购买合同') \ .map(lambda contract:contract.contract_money).reduce(lambda sum,money:sum+money) gouMaiMoney2 = mapRdd.filter(lambda contract: contract.contract_type == '购买合同') \ .map(lambda contract: contract.contract_money).sum() print("购买合同的总金额是:",gouMaiMoney2) # 第三问 fenQiNum = mapRdd.filter(lambda contract:contract.pay_type=='分期付款').count() print("分期付款占全部订单的比例是:",fenQiNum/totalNum) # 使用完后,记得关闭 sc.stop()
SparkContext
对象。最小点击次数、平均点击次数,也就是计算所有用户在所有搜索过程中的最大、平均点击次数 print("===========统计所有用户搜索中最大点击次数、本文将通过几个实际案例,详细介绍 PySpark 在数据处理中的应用,包括数据清洗、总金额、手机号码流量统计案例(一)需求分析
(二)代码实现
(三)代码解析
二、
汉语是需要分词的
python语言: Jieba 分词器
Java语言: IK 分词器(好久没更新过了)
pip install jieba -i https://pypi.tuna.tsinghua.edu.cn/simple/
没有自定版本,安装的就是最新的版本
语法:jieba.cut(“语句”) / jieba.cut_for_search(“语句”)
全模式:将句子中所有可以组成词的词语都扫描出来, 速度非常快,但可能会出现歧义
jieba.cut("语句", cut_all=True)
精确模式:将句子最精确地按照语义切开,适合文本分析,提取语义中存在的每个词
jieba.cut("语句", cut_all=False)
搜索引擎模式:在精确模式的基础上,对长词再次切分,适合用于搜索引擎分词
jieba.cut_for_search("语句")
import jieba# 测试一下结巴分词器str = "中华人民共和国"list01 = jieba.cut(str, cut_all=True)# 中华,中华人民,中华人民共和国,华人,人民,人民共和国,共和,共和国print(",".join(list01))# 中华人民共和国list02 = jieba.cut(str, cut_all=False)for ele in list02: print(ele)# 中华 华人 人民 共和 共和国 中华人民共和国 比全模式少多,比精确模式多,适用于搜索引擎list03 = jieba.cut_for_search(str)print(*list03)
以下是实现日志分析功能的 PySpark 代码:
import osimport re# 导入pyspark模块from pyspark import SparkContext, SparkConfimport jiebaif __name__ == '__main__': # 配置环境 os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 获取 conf 对象 conf = SparkConf().setMaster("local[*]").setAppName("") # 根据配置文件,得到一个 SC 对象,第一个 conf 是形参的名字,第二个 conf 是实参的名字 sc = SparkContext(conf=conf) # print(sc) # 清洗数据 print("===========清洗数据===========") fileRdd = sc.textFile("../../datas/sogou/sogou.tsv") print(fileRdd.count()) print(fileRdd.first()) listRdd = fileRdd.map(lambda line: re.split("\\s+", line)) filterList = listRdd.filter(lambda l1: len(l1) == 6) # 这个结果只获取而来时间 uid 以及热词,热词将左右两边的[] 去掉了 tupleRdd = filterList.map(lambda l1: (l1[0], l1[1], l1[2][1:-1])) # 求热词top10 print("===========求热词top10===========") wordRdd = tupleRdd.flatMap(lambda t1: jieba.cut_for_search(t1[2])) filterRdd2 = wordRdd.filter(lambda word: len(word.strip()) != 0 and word != "的").filter( lambda word: re.fullmatch("[\u4e00-\u9fa5]+", word) is not None) # filterRdd2.foreach(print) result = filterRdd2.map(lambda word: (word, 1)).reduceByKey(lambda sum, num: sum + num).sortBy( keyfunc=lambda tup: tup[1], ascending=False).take(10) for ele in result: print(ele) # 统计所有用户搜索中最大点击次数、
Contract
类来封装合同数据的相关字段。filterRdd
进行 map
操作,提取手机号码和总流量。常见错误及解决方法五、同时,也指出了在实际运行代码过程中可能遇到的错误及解决方法。
fenQiNum
,再除以总订单数 totalNum
。总结在大数据处理领域,PySpark 作为强大的工具,能够高效地处理大规模数据。需要查询已交货和未交货的数量分别是多少、
(小时, 1)
格式,通过 reduceByKey
统计每小时点击量,最后按点击量降序排序并收集结果。日志分析案例filter
操作过滤数据,先检查数据长度是否为 11,再通过正则表达式验证手机号码格式是否正确,得到 filterRdd
。购买数量、合同数据分析案例(一)需求分析
(二)代码实现
(三)代码解析
三、
totalNum
,再通过过滤得到已交货订单数 deliverNum
,进而得出未交货订单数。mapRdd
。合同类型、