Spark - RDD 算子介绍及使用 Scala、Java、Python 三种语言演示
创始人
2024-02-21 03:22:06

一、RDD 的起源

RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行流程如下:
在这里插入图片描述
多个 MapReduce 任务之间只能通过磁盘来进行传递数据,很明显的效率低下,再来看 RDD 的处理方式:

在这里插入图片描述
整个过程是共享内存的, 而不需要将中间结果存放在分布式文件系统中,这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度。

二、RDD 的特点

RDD 不仅是数据集, 也是编程模型,提供了上层 API, 同时 RDDAPIjdk8stream 流对集合运算的 API 非常类似,同样也都是各算子,如下:

textFile.filter(StringUtils.isNotBlank) //过滤空内容.flatMap(_.split(" ")) //根据空格拆分.map((_, 1)) // 构建新的返回.foreach(s => println(s._1 + "  " + s._2)) //循环

RDD 的算子大致分为两类:

  • Transformation 转换操作, 例如 map flatMap filter 等。
  • Action 动作操作, 例如 reduce collect show

注意:执行 RDD 的时候会进行惰性求值,执行到转换操作的时候,并不会立刻执行,直到遇见了 Action 操作,才会触发真正的执行。

创建 RDD

RDD 有三种创建方式,可以通过本地集合直接创建,也可以通过读取外部数据集来创建,还可以通过其它的 RDD 衍生而来:

首先声明 SparkContext

  • scala:
val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc = new SparkContext(conf)
  • java
SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
  • python
from pyspark import SparkConf, SparkContext, StorageLevel
import findsparkif __name__ == '__main__':findspark.init()conf = SparkConf().setAppName('spark').setMaster('local[*]')sc = SparkContext(conf=conf)

1. 通过集合创建

  • scala
val rdd1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
//指定分区
val rdd2 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""), 5)
  • java
JavaRDD rdd1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
//指定分区
JavaRDD rdd2 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""), 5);
  • python
rdd1 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""])
#
rdd2 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""], 5)

2. 通过文件创建

  • scala
 //读取本地文件val rdd3 = sc.textFile("D:/test/spark/input3/words.txt")//读取本地文件,指定分区val rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)//读取 HDFS 文件val rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")//读取文件同时拿到文件名val rdd6 = sc.textFile("hdfs://test/spark/input3/")
  • java
//读取本地文件
JavaRDD rdd3 = sc.textFile("D:/test/spark/input3/words.txt");
//读取本地文件,指定分区
JavaRDD rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5);
//读取 HDFS 文件
JavaRDD rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt");
//读取文件同时拿到文件名
JavaRDD rdd6 = sc.textFile("hdfs://test/spark/input3/");
  • python
# 读取本地文件
rdd3 = sc.textFile("D:/test/spark/input3/words.txt")
#读取本地文件,指定分区
rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)
#读取 HDFS 文件
rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")
#读取文件同时拿到文件名
rdd6 = sc.textFile("hdfs://test/spark/input3/")

下面对相关常用算子进行演示。

三、Transformations 算子

1. map

RDD 中的数据 一对一 的转为另一种形式:

例如:

  • scala:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
println(num.map(_+1).collect().toList
)
  • java:
JavaRDD num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
System.out.println(num.map(i -> i + 1).collect()
);
  • python:
num = sc.parallelize((1, 2, 3, 4, 5))
print(num.map(lambda i:i+1).collect()
)

在这里插入图片描述

2. flatMap

Map 算子类似,但是 FlatMap 是一对多,并都转化为一维数据:

例如:

  • scala:
val text = sc.parallelize(Seq("abc def", "hello word", "dfg,okh", "he,word"))
println(text.flatMap(_.split(" ")).flatMap(_.split(",")).collect().toList
)
  • java:
JavaRDD text = sc.parallelize(Arrays.asList("abc def", "hello word", "dfg,okh", "he,word"));
System.out.println(text.flatMap(s ->Arrays.asList(s.split(" ")).iterator()).flatMap(s ->Arrays.asList(s.split(",")).iterator()).collect()
);
  • python:
text = sc.parallelize(("abc def", "hello word", "dfg,okh", "he,word"))
print(text.flatMap(lambda s: s.split(" ")).flatMap(lambda s: s.split(",")).collect()
)

在这里插入图片描述

3. filter

过滤掉不需要的内容:

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"))
println(text.filter(_.equals("hello")).collect().toList
)
  • java:
JavaRDD text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"));
System.out.println(text.filter(s -> Objects.equals(s,"hello")).collect()
);
  • python:
text = sc.parallelize(("hello", "hello", "word", "word"))
print(text.filter(lambda s: s == 'hello').collect()
)

在这里插入图片描述

4. mapPartitions

map 类似,针对整个分区的数据转换,拿到的是每个分区的集合:

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(text.mapPartitions(iter => {iter.map(_ + "333")}).collect().toList
)
  • java:
JavaRDD text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(text.mapPartitions(iter -> {List list = new ArrayList<>();iter.forEachRemaining(s -> list.add(s+"333"));return list.iterator();}).collect()
);
  • python:
 text = sc.parallelize(("hello", "hello", "word", "word"), 2)def partition(par):tmpArr = []for s in par:tmpArr.append(s + "333")return tmpArrprint(text.mapPartitions(partition).collect())

在这里插入图片描述

5. mapPartitionsWithIndex

mapPartitions 类似, 只是在函数中增加了分区的 Index

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(text.mapPartitionsWithIndex((index, iter) => {println("当前分区" + index)iter.map(_ + "333")}, true).collect().toList
)
  • java:
JavaRDD text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(text.mapPartitionsWithIndex((index, iter) -> {System.out.println("当前分区" + index);List list = new ArrayList<>();iter.forEachRemaining(s -> list.add(s + "333"));return list.iterator();}, true).collect()
);
  • python:
text = sc.parallelize(("hello", "hello", "word", "word"), 2)def partition(index, par):print("当前分区" + str(index))tmpArr = []for s in par:tmpArr.append(s + "333")return tmpArrprint(text.mapPartitionsWithIndex(partition).collect()
)

在这里插入图片描述

6. mapValues

只能作用于 Key-Value 型数据, 和 Map 类似, 也是使用函数按照转换数据, 不同点是 MapValues 只转换 Key-Value 中的 Value

例如:

  • scala:
val text = sc.parallelize(Seq("abc", "bbb", "ccc", "dd"))
println(text.map((_, "v" + _)).mapValues(_ + "66").collect().toList
)
  • java:
JavaRDD text = sc.parallelize(Arrays.asList("abc", "bbb", "ccc", "dd"));
System.out.println(text.mapToPair(s -> new Tuple2<>(s, "v" + s)).mapValues(v -> v + "66").collect()
);
  • python:
text = sc.parallelize(("abc", "bbb", "ccc", "dd"))
print(text.map(lambda s: (s, "v" + s)).mapValues(lambda v: v + "66").collect()
)

在这里插入图片描述

7. sample

可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失:

第一个参数为withReplacement, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说,如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复。

第二个参数为fraction, 意为抽样的比例。

第三个参数为seed, 随机数种子, 用于 Sample 内部随机生成下标,一般不指定,使用默认值。

例如:

  • scala:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
println(num.sample(true,0.6,2).collect().toList
)
  • java:
JavaRDD num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(num.sample(true, 0.6, 2).collect()
);
  • python:
num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
print(num.sample(True, 0.6, 2).collect()
)

在这里插入图片描述

8. union

两个数据并集,类似于数据库的 union

例如:

  • scala:
val text1 = sc.parallelize(Seq("aa", "bb"))
val text2 = sc.parallelize(Seq("cc", "dd"))
println(text1.union(text2).collect().toList
)
  • java:
JavaRDD text1 = sc.parallelize(Arrays.asList("aa", "bb"));
JavaRDD text2 = sc.parallelize(Arrays.asList("cc", "dd"));
System.out.println(text1.union(text2).collect()
);
  • python:
text1 = sc.parallelize(("aa", "bb"))
text2 = sc.parallelize(("cc", "dd"))
print(text1.union(text2).collect()
)

在这里插入图片描述

9. join,leftOuterJoin,rightOuterJoin

两个(key,value)数据集,根据 key 取连接、左连接、右连接,类似数据库中的连接:

例如:

  • scala:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = sc.parallelize(Seq("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))val s3 = s1.map(s => (s.split(",")(0), s.split(",")(0)))
val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))println(s3.join(s4).collectAsMap)
println(s3.leftOuterJoin(s4).collectAsMap)
println(s3.rightOuterJoin(s4).collectAsMap)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
JavaRDD s2 = sc.parallelize(Arrays.asList("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"));JavaPairRDD s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
JavaPairRDD s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));System.out.println(s3.join(s4).collectAsMap());
System.out.println(s3.leftOuterJoin(s4).collectAsMap());
System.out.println(s3.rightOuterJoin(s4).collectAsMap());
  • python:
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = sc.parallelize(("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))s3 = s1.map(lambda s:(s.split(",")[0], s.split(",")[0]))
s4 = s2.map(lambda s:(s.split(",")[0], s.split(",")[1]))print(s3.join(s4).collectAsMap())
print(s3.leftOuterJoin(s4).collectAsMap())
print(s3.rightOuterJoin(s4).collectAsMap())

在这里插入图片描述

10. intersection

获取两个集合的交集 :

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
println(s1.intersection(s2).collect().toList
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
JavaRDD s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
System.out.println(s1.intersection(s2).collect()
);
  • python:
s1 = sc.parallelize(("abc", "dfe", "hello"))
s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
print(s1.intersection(s2).collect()
)

在这里插入图片描述

11. subtract

获取差集,a - b ,取 a 集合中 b 集合没有的元素:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
println(s1.subtract(s2).collect().toList
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
JavaRDD s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
System.out.println(s1.subtract(s2).collect()
);
  • python:
s1 = sc.parallelize(("abc", "dfe", "hello"))
s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
print(s1.subtract(s2).collect()
)

在这里插入图片描述

12. distinct

元素去重,是一个需要 Shuffled 的操作:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.distinct().collect().toList
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(s1.distinct().collect()
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.distinct().collect()
)

在这里插入图片描述

13. reduceByKey

只能作用于 Key-Value 型数据,根据 Key 分组生成一个 Tuple,然后针对每个组执行 reduce 算子,传入两个参数,一个是当前值,一个是局部汇总,这个函数需要有一个输出, 输出就是这个 Key 的汇总结果,是一个需要 Shuffled 的操作:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.map((_, 1)).reduceByKey(Integer.sum).collectAsMap
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(s1.mapToPair(s -> new Tuple2<>(s, 1)).reduceByKey(Integer::sum).collectAsMap()
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.map(lambda s: (s, 1)).reduceByKey(lambda v1, v2: v1 + v2).collectAsMap()
)

在这里插入图片描述

14. groupByKey

只能作用于 Key-Value 型数据,根据 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value,是一个需要 Shuffled 的操作。

GroupByKeyReduceByKey 不同,因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.map((_, 1)).groupByKey().collectAsMap
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(s1.mapToPair(s -> new Tuple2<>(s, 1)).groupByKey().collectAsMap()
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.map(lambda s: (s, 1)).reduceByKey().collectAsMap()
)

在这里插入图片描述

15. combineByKey

对数据集按照 Key 进行聚合,groupByKey, reduceByKey 的底层都是 combineByKey

参数:

createCombiner 将 Value 进行初步转换
mergeValue 在每个分区把上一步转换的结果聚合
mergeCombiners 在所有分区上把每个分区的聚合结果聚合
partitioner 可选, 分区函数
mapSideCombiner 可选, 是否在 Map 端 Combine
serializer 序列化器

例如,求取每个人的分数的平均值:

  • scala:
val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
println(s1.map(s => (s.split(":")(0), s.split(":")(1).toDouble)).combineByKey(score => (score, 1),(c: (Double, Int), newScore: Double) => (c._1 + newScore, c._2 + 1),(d1: (Double, Int), d2: (Double, Int)) => (d1._1 + d2._1, d1._2 + d2._2)).map(t => (t._1, t._2._1 / t._2._2)).collectAsMap
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"));
System.out.println(s1.mapToPair(s -> new Tuple2<>(s.split(":")[0], Double.parseDouble(s.split(":")[1]))).combineByKey((Function>) score -> new Tuple2(score, 1),(Function2, Double, Tuple2>) (c, newScore) -> new Tuple2<>(c._1 + newScore, c._2 + 1),(Function2, Tuple2, Tuple2>) (d1, d2) -> new Tuple2<>(d1._1 + d2._1, d1._2 + d2._2)).mapToPair(t -> new Tuple2(t._1, t._2._1 / t._2._2)).collectAsMap()
);
  • python:
s1 = sc.parallelize(("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
print(s1.map(lambda s: (s.split(":")[0], float(s.split(":")[1]))).combineByKey(lambda score: (score, 1),lambda c, newScore: (c[0] + newScore, c[1] + 1),lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1])).map(lambda t: (t[0], t[1][0] / t[1][1])).collectAsMap()
)

在这里插入图片描述

16. cogroup

多个 RDD 协同分组, 将多个 RDDKey 相同的 Value 分组:

例如:

  • scala:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = sc.parallelize(Seq("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))val s3 = s1.map(s => (s.split(",")(0), s.split(",")(1)))
val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))println(s3.cogroup(s4).collectAsMap
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
JavaRDD s2 = sc.parallelize(Arrays.asList("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"));JavaPairRDD s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
JavaPairRDD s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));System.out.println(s3.cogroup(s4).collectAsMap()
);
  • python:
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = sc.parallelize(("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))
s3 = s1.map(lambda s: (s.split(",")[0], s.split(",")[1]))
s4 = s2.map(lambda s: (s.split(",")[0], s.split(",")[1]))
print(s3.cogroup(s4).collectAsMap()
)

在这里插入图片描述

17. sortBy ,sortByKey

数据排序,同 sortByKey ,但普通的 RDD 没有sortByKey, 只有 Key-ValueRDD 才有:

参数
func通过这个函数返回要排序的字段
ascending是否升序
numPartitions分区数

例如:

  • scala:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = s1.map(s => (s.split(",")(0), s.split(",")(1).toInt))
println(s2.sortBy(_._2,false).collectAsMap()
)
println(s2.sortByKey(false).collectAsMap()
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
System.out.println(s1.map(s -> new Tuple2<>(s.split(",")[0], Integer.parseInt(s.split(",")[1]))).sortBy(t -> t._2, false, 1).collect()
);
System.out.println(s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], Integer.parseInt(s.split(",")[1]))).sortByKey(false).collect()
);
  • python:
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = s1.map(lambda s:(s.split(",")[0],int(s.split(",")[1])))
print(s2.sortBy(lambda t:t[1],False).collectAsMap()
)
print(s2.sortByKey(False).collectAsMap()
)

在这里插入图片描述

18. repartition,coalesce

repartition:重新分区,coalesce:减少分区,如果新的分区数量比原分区数大, 必须 Shuffled, 否则重分区无效,repartitioncoalesce 的不同就在于 coalesce 可以控制是否 Shufflerepartition 是一个 Shuffled 操作。

例如:

  • scala:
var p1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
println(p1.getNumPartitions)
p1 = p1.repartition(5)
println(p1.getNumPartitions)
p1 = p1.coalesce(3)
println(p1.getNumPartitions)
  • java:
JavaRDD p1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
System.out.println(p1.getNumPartitions());
p1 = p1.repartition(5);
System.out.println(p1.getNumPartitions());
p1 = p1.coalesce(3);
System.out.println(p1.getNumPartitions());
  • python:
p1 = sc.parallelize(("abc", "abc", "fff dd", "ee,pp", ""))
print(p1.getNumPartitions)
p1.repartition(5)
print(p1.getNumPartitions)
p1.coalesce(3)
print(p1.getNumPartitions)

四、Action 算子

1. reduce

对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总。

reducereduceByKey 完全不同, reduce 是一个 action, 并不是 Shuffled 操作,本质上 reduce 就是现在每个 partition 上求值, 最终把每个 partition 的结果再汇总。

例如:

  • scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(p1.reduce((_+_))
)
  • java:
JavaRDD p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(p1.reduce(Integer::sum)
);
  • python:
 p1 = sc.parallelize((1, 2, 3, 4, 6))print(p1.reduce(lambda i1, i2: i1 + i2))

在这里插入图片描述

2. collect

以数组的形式返回数据集中所有元素。
例如:

  • scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(p1.collect()
)
  • java:
JavaRDD p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(p1.collect()
);
  • python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(p1.collect()
)

在这里插入图片描述

3. count

数据元素个数:

例如:

  • scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(p1.count()
)
  • java:
JavaRDD p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(p1.count()
);
  • python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(p1.count()
)

在这里插入图片描述

4. first

返回第一个元素:

例如:

  • scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(p1.first()
)
  • java:
JavaRDD p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(p1.first()
);
  • python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(p1.first()
)

在这里插入图片描述

5. countByKey

求得整个数据集中 Key 以及对应 Key 出现的次数:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.map((_,1)).countByKey()
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"))
System.out.println(s1.mapToPair(s -> new Tuple2<>(s, 1)).countByKey()
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.map(lambda s: (s, 1)).countByKey()
)

在这里插入图片描述

6. take

返回前 N 个元素:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(s1.take(3)
)
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(s1.take(3)
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(s1.take(3)
)

在这里插入图片描述

7. saveAsTextFile

将结果存入 path 对应的目录中:

例如:

  • scala:
 val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))s1.saveAsTextFile("D:/test/output/text/")
  • java:
JavaRDD s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
s1.saveAsTextFile("D:/test/output/text/");
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
s1.saveAsTextFile("D:/test/output/text/")

在这里插入图片描述

相关内容

热门资讯

我心中的女神作文 我心中的女神作文  无论在学习、工作或是生活中,大家都写过作文,肯定对各类作文都很熟悉吧,借助作文人...
爱国名人故事 爱国名人故事(通用31篇)  爱国是一种情怀,更是一种精神。古往今来的爱国人士无不在用他们的作为宣誓...
文明伴我行作文700字 文明伴我行作文700字  在我们平凡的日常里,大家都经常接触到作文吧,作文要求篇章结构完整,一定要避...
曲突徙薪的成语故事及意思 曲突徙薪的成语故事及意思  成语是中国传统文化的一大特色,有固定的结构形式和固定的说法,表示一定的意...
特别的爱作文 特别的爱作文特别的爱任何一个人都不愿被打,我也不例外。但是有一次挨打,却让我不能忘记,因为那融合了一...