在 RDD
出现之前, 当时 MapReduce
是比较主流的, 而 MapReduce 如何执行流程如下:
多个 MapReduce
任务之间只能通过磁盘来进行传递数据,很明显的效率低下,再来看 RDD
的处理方式:
整个过程是共享内存的, 而不需要将中间结果存放在分布式文件系统中,这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度。
RDD
不仅是数据集, 也是编程模型,提供了上层 API
, 同时 RDD
的 API
和 jdk8
中 stream
流对集合运算的 API
非常类似,同样也都是各算子,如下:
textFile.filter(StringUtils.isNotBlank) //过滤空内容.flatMap(_.split(" ")) //根据空格拆分.map((_, 1)) // 构建新的返回.foreach(s => println(s._1 + " " + s._2)) //循环
RDD
的算子大致分为两类:
map flatMap filter
等。reduce collect show
等注意:执行 RDD
的时候会进行惰性求值,执行到转换操作的时候,并不会立刻执行,直到遇见了 Action
操作,才会触发真正的执行。
RDD
有三种创建方式,可以通过本地集合直接创建,也可以通过读取外部数据集来创建,还可以通过其它的 RDD
衍生而来:
首先声明 SparkContext
:
val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc = new SparkContext(conf)
SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
from pyspark import SparkConf, SparkContext, StorageLevel
import findsparkif __name__ == '__main__':findspark.init()conf = SparkConf().setAppName('spark').setMaster('local[*]')sc = SparkContext(conf=conf)
val rdd1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
//指定分区
val rdd2 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""), 5)
JavaRDD rdd1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
//指定分区
JavaRDD rdd2 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""), 5);
rdd1 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""])
#
rdd2 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""], 5)
//读取本地文件val rdd3 = sc.textFile("D:/test/spark/input3/words.txt")//读取本地文件,指定分区val rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)//读取 HDFS 文件val rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")//读取文件同时拿到文件名val rdd6 = sc.textFile("hdfs://test/spark/input3/")
//读取本地文件
JavaRDD rdd3 = sc.textFile("D:/test/spark/input3/words.txt");
//读取本地文件,指定分区
JavaRDD rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5);
//读取 HDFS 文件
JavaRDD rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt");
//读取文件同时拿到文件名
JavaRDD rdd6 = sc.textFile("hdfs://test/spark/input3/");
# 读取本地文件
rdd3 = sc.textFile("D:/test/spark/input3/words.txt")
#读取本地文件,指定分区
rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)
#读取 HDFS 文件
rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")
#读取文件同时拿到文件名
rdd6 = sc.textFile("hdfs://test/spark/input3/")
下面对相关常用算子进行演示。
将 RDD
中的数据 一对一 的转为另一种形式:
例如:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
println(num.map(_+1).collect().toList
)
JavaRDD num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
System.out.println(num.map(i -> i + 1).collect()
);
num = sc.parallelize((1, 2, 3, 4, 5))
print(num.map(lambda i:i+1).collect()
)
和 Map
算子类似,但是 FlatMap
是一对多,并都转化为一维数据:
例如:
val text = sc.parallelize(Seq("abc def", "hello word", "dfg,okh", "he,word"))
println(text.flatMap(_.split(" ")).flatMap(_.split(",")).collect().toList
)
JavaRDD text = sc.parallelize(Arrays.asList("abc def", "hello word", "dfg,okh", "he,word"));
System.out.println(text.flatMap(s ->Arrays.asList(s.split(" ")).iterator()).flatMap(s ->Arrays.asList(s.split(",")).iterator()).collect()
);
text = sc.parallelize(("abc def", "hello word", "dfg,okh", "he,word"))
print(text.flatMap(lambda s: s.split(" ")).flatMap(lambda s: s.split(",")).collect()
)
过滤掉不需要的内容:
例如:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"))
println(text.filter(_.equals("hello")).collect().toList
)
JavaRDD text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"));
System.out.println(text.filter(s -> Objects.equals(s,"hello")).collect()
);
text = sc.parallelize(("hello", "hello", "word", "word"))
print(text.filter(lambda s: s == 'hello').collect()
)
和 map
类似,针对整个分区的数据转换,拿到的是每个分区的集合:
例如:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(text.mapPartitions(iter => {iter.map(_ + "333")}).collect().toList
)
JavaRDD text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(text.mapPartitions(iter -> {List list = new ArrayList<>();iter.forEachRemaining(s -> list.add(s+"333"));return list.iterator();}).collect()
);
text = sc.parallelize(("hello", "hello", "word", "word"), 2)def partition(par):tmpArr = []for s in par:tmpArr.append(s + "333")return tmpArrprint(text.mapPartitions(partition).collect())
和 mapPartitions
类似, 只是在函数中增加了分区的 Index
:
例如:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(text.mapPartitionsWithIndex((index, iter) => {println("当前分区" + index)iter.map(_ + "333")}, true).collect().toList
)
JavaRDD text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(text.mapPartitionsWithIndex((index, iter) -> {System.out.println("当前分区" + index);List list = new ArrayList<>();iter.forEachRemaining(s -> list.add(s + "333"));return list.iterator();}, true).collect()
);
text = sc.parallelize(("hello", "hello", "word", "word"), 2)def partition(index, par):print("当前分区" + str(index))tmpArr = []for s in par:tmpArr.append(s + "333")return tmpArrprint(text.mapPartitionsWithIndex(partition).collect()
)
只能作用于 Key-Value
型数据, 和 Map
类似, 也是使用函数按照转换数据, 不同点是 MapValues
只转换 Key-Value
中的 Value
:
例如:
val text = sc.parallelize(Seq("abc", "bbb", "ccc", "dd"))
println(text.map((_, "v" + _)).mapValues(_ + "66").collect().toList
)
JavaRDD text = sc.parallelize(Arrays.asList("abc", "bbb", "ccc", "dd"));
System.out.println(text.mapToPair(s -> new Tuple2<>(s, "v" + s)).mapValues(v -> v + "66").collect()
);
text = sc.parallelize(("abc", "bbb", "ccc", "dd"))
print(text.map(lambda s: (s, "v" + s)).mapValues(lambda v: v + "66").collect()
)
可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失:
第一个参数为withReplacement
, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说,如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复。
第二个参数为fraction
, 意为抽样的比例。
第三个参数为seed
, 随机数种子, 用于 Sample
内部随机生成下标,一般不指定,使用默认值。
例如:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
println(num.sample(true,0.6,2).collect().toList
)
JavaRDD num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(num.sample(true, 0.6, 2).collect()
);
num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
print(num.sample(True, 0.6, 2).collect()
)
两个数据并集,类似于数据库的 union
:
例如:
val text1 = sc.parallelize(Seq("aa", "bb"))
val text2 = sc.parallelize(Seq("cc", "dd"))
println(text1.union(text2).collect().toList
)
JavaRDD text1 = sc.parallelize(Arrays.asList("aa", "bb"));
JavaRDD text2 = sc.parallelize(Arrays.asList("cc", "dd"));
System.out.println(text1.union(text2).collect()
);
text1 = sc.parallelize(("aa", "bb"))
text2 = sc.parallelize(("cc", "dd"))
print(text1.union(text2).collect()
)
两个(key,value)
数据集,根据 key
取连接、左连接、右连接,类似数据库中的连接:
例如:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = sc.parallelize(Seq("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))val s3 = s1.map(s => (s.split(",")(0), s.split(",")(0)))
val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))println(s3.join(s4).collectAsMap)
println(s3.leftOuterJoin(s4).collectAsMap)
println(s3.rightOuterJoin(s4).collectAsMap)
JavaRDD s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
JavaRDD s2 = sc.parallelize(Arrays.asList("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"));JavaPairRDD s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
JavaPairRDD s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));System.out.println(s3.join(s4).collectAsMap());
System.out.println(s3.leftOuterJoin(s4).collectAsMap());
System.out.println(s3.rightOuterJoin(s4).collectAsMap());
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = sc.parallelize(("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))s3 = s1.map(lambda s:(s.split(",")[0], s.split(",")[0]))
s4 = s2.map(lambda s:(s.split(",")[0], s.split(",")[1]))print(s3.join(s4).collectAsMap())
print(s3.leftOuterJoin(s4).collectAsMap())
print(s3.rightOuterJoin(s4).collectAsMap())
获取两个集合的交集 :
例如:
val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
println(s1.intersection(s2).collect().toList
)
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
JavaRDD s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
System.out.println(s1.intersection(s2).collect()
);
s1 = sc.parallelize(("abc", "dfe", "hello"))
s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
print(s1.intersection(s2).collect()
)
获取差集,a - b
,取 a
集合中 b
集合没有的元素:
例如:
val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
println(s1.subtract(s2).collect().toList
)
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
JavaRDD s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
System.out.println(s1.subtract(s2).collect()
);
s1 = sc.parallelize(("abc", "dfe", "hello"))
s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
print(s1.subtract(s2).collect()
)
元素去重,是一个需要 Shuffled
的操作:
例如:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.distinct().collect().toList
)
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(s1.distinct().collect()
);
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.distinct().collect()
)
只能作用于 Key-Value
型数据,根据 Key
分组生成一个 Tuple
,然后针对每个组执行 reduce
算子,传入两个参数,一个是当前值,一个是局部汇总,这个函数需要有一个输出, 输出就是这个 Key
的汇总结果,是一个需要 Shuffled
的操作:
例如:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.map((_, 1)).reduceByKey(Integer.sum).collectAsMap
)
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(s1.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey(Integer::sum).collectAsMap()
);
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.map(lambda s: (s, 1)).reduceByKey(lambda v1, v2: v1 + v2).collectAsMap()
)
只能作用于 Key-Value
型数据,根据 Key
分组, 和 ReduceByKey
有点类似, 但是 GroupByKey
并不求聚合, 只是列举 Key
对应的所有 Value
,是一个需要 Shuffled
的操作。
GroupByKey
和 ReduceByKey
不同,因为需要列举 Key
对应的所有数据, 所以无法在 Map
端做 Combine
, 所以 GroupByKey
的性能并没有 ReduceByKey
好:
例如:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.map((_, 1)).groupByKey().collectAsMap
)
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(s1.mapToPair(s -> new Tuple2<>(s, 1)).groupByKey().collectAsMap()
);
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.map(lambda s: (s, 1)).reduceByKey().collectAsMap()
)
对数据集按照 Key
进行聚合,groupByKey, reduceByKey
的底层都是 combineByKey
参数:
createCombiner 将 Value 进行初步转换
mergeValue 在每个分区把上一步转换的结果聚合
mergeCombiners 在所有分区上把每个分区的聚合结果聚合
partitioner 可选, 分区函数
mapSideCombiner 可选, 是否在 Map 端 Combine
serializer 序列化器
例如,求取每个人的分数的平均值:
val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
println(s1.map(s => (s.split(":")(0), s.split(":")(1).toDouble)).combineByKey(score => (score, 1),(c: (Double, Int), newScore: Double) => (c._1 + newScore, c._2 + 1),(d1: (Double, Int), d2: (Double, Int)) => (d1._1 + d2._1, d1._2 + d2._2)).map(t => (t._1, t._2._1 / t._2._2)).collectAsMap
)
JavaRDD s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"));
System.out.println(s1.mapToPair(s -> new Tuple2<>(s.split(":")[0], Double.parseDouble(s.split(":")[1]))).combineByKey((Function>) score -> new Tuple2(score, 1),(Function2, Double, Tuple2>) (c, newScore) -> new Tuple2<>(c._1 + newScore, c._2 + 1),(Function2, Tuple2, Tuple2>) (d1, d2) -> new Tuple2<>(d1._1 + d2._1, d1._2 + d2._2)).mapToPair(t -> new Tuple2(t._1, t._2._1 / t._2._2)).collectAsMap()
);
s1 = sc.parallelize(("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
print(s1.map(lambda s: (s.split(":")[0], float(s.split(":")[1]))).combineByKey(lambda score: (score, 1),lambda c, newScore: (c[0] + newScore, c[1] + 1),lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1])).map(lambda t: (t[0], t[1][0] / t[1][1])).collectAsMap()
)
多个 RDD
协同分组, 将多个 RDD
中 Key
相同的 Value
分组:
例如:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = sc.parallelize(Seq("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))val s3 = s1.map(s => (s.split(",")(0), s.split(",")(1)))
val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))println(s3.cogroup(s4).collectAsMap
)
JavaRDD s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
JavaRDD s2 = sc.parallelize(Arrays.asList("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"));JavaPairRDD s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
JavaPairRDD s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));System.out.println(s3.cogroup(s4).collectAsMap()
);
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = sc.parallelize(("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))
s3 = s1.map(lambda s: (s.split(",")[0], s.split(",")[1]))
s4 = s2.map(lambda s: (s.split(",")[0], s.split(",")[1]))
print(s3.cogroup(s4).collectAsMap()
)
数据排序,同 sortByKey
,但普通的 RDD
没有sortByKey
, 只有 Key-Value
的 RDD
才有:
参数
func
通过这个函数返回要排序的字段
ascending
是否升序
numPartitions
分区数
例如:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = s1.map(s => (s.split(",")(0), s.split(",")(1).toInt))
println(s2.sortBy(_._2,false).collectAsMap()
)
println(s2.sortByKey(false).collectAsMap()
)
JavaRDD s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
System.out.println(s1.map(s -> new Tuple2<>(s.split(",")[0], Integer.parseInt(s.split(",")[1]))).sortBy(t -> t._2, false, 1).collect()
);
System.out.println(s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], Integer.parseInt(s.split(",")[1]))).sortByKey(false).collect()
);
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = s1.map(lambda s:(s.split(",")[0],int(s.split(",")[1])))
print(s2.sortBy(lambda t:t[1],False).collectAsMap()
)
print(s2.sortByKey(False).collectAsMap()
)
repartition
:重新分区,coalesce
:减少分区,如果新的分区数量比原分区数大, 必须 Shuffled
, 否则重分区无效,repartition
和 coalesce
的不同就在于 coalesce
可以控制是否 Shuffle
,repartition
是一个 Shuffled
操作。
例如:
var p1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
println(p1.getNumPartitions)
p1 = p1.repartition(5)
println(p1.getNumPartitions)
p1 = p1.coalesce(3)
println(p1.getNumPartitions)
JavaRDD p1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
System.out.println(p1.getNumPartitions());
p1 = p1.repartition(5);
System.out.println(p1.getNumPartitions());
p1 = p1.coalesce(3);
System.out.println(p1.getNumPartitions());
p1 = sc.parallelize(("abc", "abc", "fff dd", "ee,pp", ""))
print(p1.getNumPartitions)
p1.repartition(5)
print(p1.getNumPartitions)
p1.coalesce(3)
print(p1.getNumPartitions)
对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总。
reduce
和 reduceByKey
完全不同, reduce
是一个 action
, 并不是 Shuffled
操作,本质上 reduce
就是现在每个 partition
上求值, 最终把每个 partition
的结果再汇总。
例如:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(p1.reduce((_+_))
)
JavaRDD p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(p1.reduce(Integer::sum)
);
p1 = sc.parallelize((1, 2, 3, 4, 6))print(p1.reduce(lambda i1, i2: i1 + i2))
以数组的形式返回数据集中所有元素。
例如:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(p1.collect()
)
JavaRDD p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(p1.collect()
);
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(p1.collect()
)
数据元素个数:
例如:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(p1.count()
)
JavaRDD p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(p1.count()
);
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(p1.count()
)
返回第一个元素:
例如:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(p1.first()
)
JavaRDD p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(p1.first()
);
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(p1.first()
)
求得整个数据集中 Key
以及对应 Key
出现的次数:
例如:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.map((_,1)).countByKey()
)
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"))
System.out.println(s1.mapToPair(s -> new Tuple2<>(s, 1)).countByKey()
);
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.map(lambda s: (s, 1)).countByKey()
)
返回前 N 个元素:
例如:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.take(3)
)
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(s1.take(3)
);
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.take(3)
)
将结果存入 path 对应的目录中:
例如:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))s1.saveAsTextFile("D:/test/output/text/")
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
s1.saveAsTextFile("D:/test/output/text/");
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
s1.saveAsTextFile("D:/test/output/text/")
下一篇:描写河水的成语,急啊!