一、RDD常见操作
1.转换操作(Transformation)
对一个数据为{1,2,3,3}的RDD进行基本的RDD转换操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
map() | 将函数应用于RDD中的每个元素,将返回值构成新的RDD | rdd.map(x => x + 1) | {2, 3, 4, 4} |
flatMap() | 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词。map+拍扁 | rdd.flatMap(x => x.to(3)) | {1, 2, 3,2,3,3,3} |
filter() | 返回一个由通过传给filter()的函数的元素组成RDD | rdd.filter(x => x != 1) | {2,3,3} |
distinct() | 去重 | rdd.distinct() | {1,2,3} |
sample(withReplacement,fraction,[seed]) | 对RDD采样,以及是否替换 | rdd.sample(false,0.5) | 非确定的 |
对数据分别为{1,2,3}和{3,4,5}的RDD进行针对两个RDD的转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 生成一个包含两个RDD中所有元素的RDD(并集) | rdd.union(other) | {1,2,3,3,4,5} |
intersection() | 求两个RDD共同元素的RDD(交集) | rdd.intersection(other) | {3} |
subtract() | 移除一个RDD中的内容(例如移除训练数据) | rdd.subtract(other) | {1,2} |
cartesian() | 与另一个RDD的笛卡尔积 | rdd.cartesian(other) | {(1,3),(1,4),…,(3,5)} |
2.行动操作(Action)
对一个数据为{1,2,3,3}的RDD进行基本的RDD行动操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
collect() | 返回RDD中的所有元素 | rdd.collect() | {1,2,3,3} |
count() | RDD中的元素个数 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出现的次数 | rdd.countByValue() | {(1,1),(2,1),(3,2)} |
take(num) | 从RDD中返回num个元素 | rdd.take(2) | {1,2} |
top(num) | 从RDD中返回最前面的num个元素 | rdd.top(2) | {3,3} |
takeOrdered(num)(ordering) | 从RDD中按照提供的顺序返回最前面的num个元素 | rdd.takeOrdered(2)(myOrdering) | {3,3} |
takeSample(withReplacement,num,[seed]) | 从RDD中返回任意一些元素 | rdd.takeSample(false,1) | 非确定性 |
reduce(func) | 并行整合RDD中所有数据(例如sum) | rdd.reduce((x, y) => x + y) | 9 |
fold(zero)(func) | 和reduce()一样,但是需要提供初始值 | rdd.fold(0)((x, y) => x + y) | 9 |
aggregate(zeroValue)(seqOp,combOp) | 和reduce相似,但是通常返回不同类型的函数 | rdd.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1),(x, y) => (x._1 + y._1, x._2 + y._2)) | (9,4) |
foreach(func) | 对RDD中的每个元素使用给定的函数 | rdd.foreach(func) | 无 |
二、Pair RDD常见操作
1.转换操作(Transformation)
Pair RDD的转换操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
reduceByKey(func) | 合并具有相同的键的值 | rdd.reduceByKey((x, y) => x + y) | {(1,2),(3,10)} |
groupByKey() | 对具有相同键的值进行分组 | rdd.groupByKey() | {(1,[2]),(3,[4,6])} |
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) | 使用不同的返回类型合并具有相同键的值 | 无 | |
mapValues(func) | 对pair RDD中的每个值应用一个函数而不改变键 | rdd.mapValues(x => x + 1) | {(1, 3), (3, 5), (3, 7)} |
flatMapValues(func) | 对pairRDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。通常用户符号化 | rdd.flatMapValues( x => (x to 5)) | {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)} |
keys() | 返回一个仅包含键的RDD | rdd.keys | {1,3,3} |
values() | 返回一个仅包含值的RDD | rdd.values | {2,4,6} |
sortByKey() | 返回一个根据键排序的RDD | rdd.sortByKey() | {(1, 2), (3, 4), (3, 6)} |
针对两个pair RDD的转换操作(rdd = {(1, 2), (3, 4), (3, 6)} other={(3,9)})
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
subtractByKey | 删掉RDD中键与other RDD中的键相同的元素 | rdd.subtractByKey(other) | {(1,2)} |
join | 对两个RDD进行内连接 | rdd.join(other) | {(3, (4, 9)),(3,(6, 9))} |
rightOuterJoin | 对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接) | rdd.rightOuterJoin(other) | {(3,(Some(4),9)),(3,(Some(6),9))} |
leftOuterJoin | 对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接) | rdd.leftOuterJoin(other) | {(1,(2,None)),(3,(Some(4),9)),(3,(Some(6),9)))} |
cogroup | 将两个RDD中拥有相同键的数据分组到一起 | rdd.cogroup(other) | {(1,([2],[])),(3,([4,6],[9]))} |
2.行动操作(Action)
和转换操作一样,所有基础RDD支持的传统行动操作也都在pair RDD上可用。
Pair RDD的行动操作(以键值对集合{(1, 2), (3, 4),(3, 6)}为例)
函数 | 描述 | 示例 | 结果 |
---|---|---|---|
countByKey() | 对每个键对应的元素分别计数 | rdd.countByKey() | {(1, 1),(3, 2)} |
collectAsMap() | 将结果以映射表的形式返回,以便查询 | rdd.collectAsMap() | Map{(1,2),(3,6)} |
lookup(key) | 返回给定键对应的所有值 | rdd.lookup(3) | [4, 6] |
三、一些Spark实例
1.WordCount
文件包含很多行文本,每行文本由多个单词构成,单词之间用空格分隔。
1 | scala> val lines = sc.textFile("/word.txt") |
或
1 | scala> lines.map(word => (word,1)).groupByKey().map(t => (t._1,t._2.sum)) |
纯Scala版
1 | import java.io.{File, PrintWriter} |
2.统计平均销量
给定一组键值对(“spark”, 2)、(“hadoop”, 6)、(“hadoop”, 4)、(“spark”, 6),key表示图书名称,value表示某天图书销量,计算每种图书的每天的平均销量。
1 | scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark"),6)) |
3.求TopN值
文本包含很多行数据,每行数据由4个字段的值构成,不同值之间用逗号隔开,4个字段分别为orderid、userid、payment和productid,要求求出TopN个payment值。
1 | 1,1768,50,155 |
代码如下:
1 | import org.apache.spark.{SparkConf, SparkContext} |
4.多文本排序
代码如下:
1 | import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} |
5.二次排序
首先根据第1列数据降序排序,如果第1列数据相等,则根据第2列数据降序排序。
1 | 输入文件file1.txt |
SecondarySortKey.scala
1 | package sort |
实现二次排序功能的代码文件SecondarySortApp.scala
1 | package sort |
Spark运行命令:
1 | /home/hadoop/install/spark-2.2.0-bin-hadoop2.7/bin/spark-submit --master spark://master:7077 --class sort.SecondarySortApp /home/hadoop/personaldoc/liwen/sparkapp-1.0-SNAPSHOT.jar |
6.Spark-Shell交互式编程
计算机系的成绩如下:
1 | Tom,DataBase,26 |
- 1.该系总共有多少学生
1 | var n = 0 |
或
1 | lines.map(line => (line.split(",")(0),1)).groupByKey().count |
或
1 | lines.map(line => line.split(",")(0)).distinct().count |
- 2.该系共开设来多少门课程
1 | lines.map(line => line.trim.split(",")(1)).distinct().count |
- 3.Tom 同学的总成绩平均分是多少
1 | val tom = lines.filter(line => line.split(",")(0) == "Tom") |
- 4.求每名同学的选修的课程门数
1 | lines.map(line => (line.split(",")(0),line.split(",")(1))) |
- 5.该系DataBase课程共有多少人选修
1 | lines.filter(line => line.split(",")(1) == "DataBase") |
或
1 | lines.filter(line => line.split(",")(1) == "DataBase").count |
- 6.各门课程的平均分是多少
1 | lines.map(line => (line.split(",")(1),line.split(",")(2).toInt)) |
- 7.使用累加器计算共有多少人选了DataBase这门课
1 | val pair = lines.filter(line => line.split(",")(1) == "DataBase").map(line => (line.split(",")(1),1)) |
7.数据去重
对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。
A文件样例如下:
1 | 20170101 x |
B文件样例如下:
1 | 20170101 y |
合并得到的输出文件C的样例如下:
1 | 20170101 x |
代码如下:
1 | import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} |
8.多文本求平均值
Algorithm成绩:
1 | 小明 92 |
Database成绩:
1 | 小明 95 |
Python成绩:
1 | 小明 82 |
平均成绩如下:
1 | (小红,83.67) |
代码如下:
1 | import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} |