搜索

pyspark 中的rdd api 编码练习


发布时间: 2022-11-24 18:33:01    浏览次数:68 次

1,使用pyspark 的rdd api 进行了数据文件的处理,包括构建RDD, 统计分析RDD ,从文件中读取数据RDD,从文件中构建 rdd的模式shema. 

然后通过模式,从rdd中生成dataframe。

 

2,代码

 

''' 构建sparkSession 和练习数据(RDD 和 KV rdd) ''' spark = SparkSession.builder.appName("rdd_api_test") \ .master("local[2]") \ .getOrCreate() sc = spark.sparkContext rdd1 = sc.parallelize([1, 5, 60, 'a', 9, 'c', 4, 'z', 'f']) rdd2 = sc.parallelize([('a', 6), ('a', 1), ('b', 2), ('c', 5), ('c', 8), ('c', 11)]) ''' 查看rdd元素 , 元素个数, KV对RDD中key的出现次数, 分区个数等常用api '''
print(rdd2.collect()) print (rdd2.take(2)) print('amount of elements:', rdd2.count()) print('RDD count of key:', rdd2.countByKey()) print('RDD output as map:', rdd2.collectAsMap()) print('RDD number of partitions:', rdd2.getNumPartitions()) ''' 数值型rdd ,常用统计函数, 最小,最大 ,平均 , 标准差,方差 ''' rdd5 = sc.parallelize(range(100)) print('RDD Min:', rdd5.min())  # rdd 最小值
print('RDD Max:', rdd5.max()) print('RDD Mean:', rdd5.mean()) print('RDD Standard deviation:', rdd5.stdev()) print('RDD Variance:', rdd5.variance()) ''' 从文件读取数据,并且去掉第一行列名,进行显示 数据源:nba.csv ''' full_csv = sc.textFile('nba.csv') header = full_csv.first() print(full_csv.filter(lambda line: line != header).take(4))  # 去掉头行后,看效果


''' 从文档中读出文件头部,设置rdd模式,然后把RDD转化为df 数据源:本地文件:customerheaders.txt 数据样例: id:string full_name:string ... '''
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField header_list = sc.textFile('customerheaders.txt') \ .map(lambda line: line.split(":")).collect()  # 返回数组,元素为[id string]...


def strToType(str):  # string 映射到 DataType
    if str == 'int': return IntegerType() elif str == 'double': return DoubleType() else: return StringType() schema = StructType([StructField(t[0], strToType(t[1]), True) for t in header_list])  # 列表构造器 构造 StructType
for item in schema: print(item) ''' 对原始文件rdd中每一行进行规范化处理 数据举例: 7,Mohandas MacRierie,mmacrierie0@xrea.com,11/24/1990,Fliptune,-7.1309871,111.591546, 254,Rita Slany,rslany1@ucla.edu,8/7/1961,Yodoo,48.7068855,2.3371075, ''' customers_rdd = sc.textFile('customers.txt') ''' 一行字符串的解析函数 逻辑: 1,取出 '''
def parseLine(line): tokens = zip(line.split(","), header_list)  # 打包为元组的列表, [(7, [id string]),(Mohandas, [name string]), ... ]
    parsed_tokens = [] for token in tokens: token_type = token[1][1]  # 取到数据的类型,然后转化该类型,并放入parsed_tokens
        print('token_type = ', token_type) if token_type == 'double': parsed_tokens.append(float(token[0])) elif token_type == 'int': parsed_tokens.append(int(token[0])) else: parsed_tokens.append(token[0]) return parsed_tokens records = customers_rdd.map(parseLine)  # 把文本字符串根据模式中对应的类型,转化为对象

for item in records.take(4): print(item) df = spark.createDataFrame(records, schema)  # rdd --> df
print (df) ''' 其他一些API介绍: rdd.foreach([FUNCTION]): 对每个元素执行函数 rdd.groupBy([CRITERA]): 分组聚合 like: ('a', 1) ('b',2) ('a', 3) --> ('a',Iterable(1,3)) ('b', 2) rdd.subtract(rdd2): 做差集计算,元素在rdd中出现,没有在rdd2中出现 rdd.subtractByKey(rdd2): 同上,适用于KV rdd rdd.sortBy([FUNCTION]): 自定义RDD元素排序 rdd.sortByKey(): 按照key 进行排序,其中key的类型必须实现了排序逻辑 rdd.join(rdd2): like : ('a', 1) ('b',2) ('a', 3) --> ('a',(1,3)) ('b', 2) '''

 

运行结果:

 

 

 

 

免责声明 pyspark 中的rdd api 编码练习,资源类别:文本, 浏览次数:68 次, 文件大小:-- , 由本站蜘蛛搜索收录2022-11-24 06:33:01。此页面由程序自动采集,只作交流和学习使用,本站不储存任何资源文件,如有侵权内容请联系我们举报删除, 感谢您对本站的支持。 原文链接:https://www.cnblogs.com/gao1261828/p/16385331.html