spark RDD中的并行度、分区器默认策略
创始人
2025-05-28 03:20:36

源头RDD

  • 源头RDD有自己的分区计算逻辑,一般没有分区器,并行度是根据分区算法自动计算的,RDD的compute函数中记录了数据如何而来,如何分区的

  • hadoopRDD,根据XxxinputFormat.getInputSplits()来决定,比如默认的TextInputFormat将文件按照0-128M进行切割,剩余部分是否小于128M的1.1倍

  • JdbcRDD,需要指定一个数字类型的字段,而且指定上界,下届,然后指定并行度进行分割

  • 大部分源头RDD都有传入并行度的参数

  • 有些源头rdd按照backend.defaultParallelism()获取默认并行度,比如sc.paralleize("a","b")

  • 本地模式:getInt("spark.default.parallelism", totalCores),根据spark.default.parallelism参数,如果没配置就是机器的总逻辑核数,setMaster(local[*]) *代表全部逻辑核

  • 集群模式:getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)),根据spark.default.parallelism参数,如果没配置就是yarn的executor的总逻辑核数,最小也得两个并行度

窄依赖RDD

  • 窄依赖RDD,没有自己的分区器(也可以设置为沿用父rdd的分区器),默认集成它的父RDD的并行度

  • 有个特殊的窄依赖算子,coalesce(2,true),他需要指定并行度

宽依赖 RDD

  • 宽依赖 RDD(ShuffledRDD),这种RDD是通过进行shuffle的算子得到的,shuffle算子必须要一个分区器,要么传,要么走默认分区器

指定并行度

  • shuffle算子指定并行度,groupBy(f(),4) //分区器为:hashpartitioner,并行度为4

  • 有些特殊的shuffle算子,默认分区器不是hashpartitioner。比如sortBy(),他是RangePartitiner,但是sortBy其实用的是sortByKey然后取value 最终.values把分区器搞丢了

指定分区器

  • shuffle算子指定分区器,groupBy(f(),RangePartitioner(4)) //分区器为:RangePartitioner,并行度为4

默认分区器策略

  • 按照 Partitioner.defaultPartitioner( )获取分区器

最大并行度:父RDD中并行度的最大值

默认并行度:spark.default.parallelism值,如果没设置这个参数,则取最大并行度

最大分区器:父RDD中并行度最大的分区器

  • 有最大分区器,而且以下两个条件满足一个即可,此时沿用最大分区器

  • 父 RDD 中的最大并行度/最大分区器所在RDD的并行度<10

  • 默认并行度小于等于最大分区器所在rdd的并行度

  • 没有最大分区器,默认以hashPatitioner作为分区器,并行度为默认并行度

例1

如果:rdd1:并行度500,分区器NONE,rdd2: 并行度100,分区器HashPartitioner

var rdd3 = rdd1.join(rdd2)

问:此时rdd3的分区器和并行度有多大?

  • 最大并行度:父RDD中并行度的最大值=>500

  • 默认并行度:取spark.default.parallelism值,如果没有则最大并行度=>500

  • 最大分区器:父RDD中并行度最大的分区器=>HashPartitioner

  • 判断能否沿用父RDD最大分区器

  • 存在最大分区器(满足)

  • 以下两个条件满足一个

  • 最大并行度(500)/最大分区器所在RDD的并行度(100)<10 (满足)

  • 默认并行度(500)小于等于 最大分区器所在rdd的并行度(100)(不满足)

因此可以沿用父RDD最大分区器,所以rdd3的分区器:Hashpartitioner,并行度100

例2

如果:rdd1:并行度2000,分区器None,rdd2: 并行度100,分区器HashPartitioner

var rdd3 = rdd1.join(rdd2)

问:此时rdd3的分区器和并行度有多大?

  • 最大并行度:父RDD中并行度的最大值=>2000

  • 默认并行度:取spark.default.parallelism值,如果没有则最大并行度=>2000

  • 最大分区器:父RDD中并行度最大的分区器=>HashPartitioner

  • 判断能否沿用父RDD最大分区器

  • 存在最大分区器(满足)

  • 以下两个条件满足一个

  • 最大并行度(2000)/最大分区器所在RDD的并行度(100)<10 (不满足)

  • 默认并行度(2000)小于等于 最大分区器所在rdd的并行度(2000)(不满足)

因此不可以沿用父RDD最大分区器,所以rdd3的分区器:采用默认的分区器、采用默认并行度HashPartitioner(2000)

例3

如果:rdd1:并行度2000,分区器None,rdd2: 并行度100,分区器HashPartitioner ,conf.set("spark.default.parallelism",100)

var rdd3 = rdd1.join(rdd2)

问:此时rdd3的分区器和并行度有多大?

  • 最大并行度:父RDD中并行度的最大值=>2000

  • 默认并行度:取spark.default.parallelism值,如果没有则最大并行度=>100

  • 最大分区器:父RDD中并行度最大的分区器=>HashPartitioner

  • 判断能否沿用父RDD最大分区器

  • 存在最大分区器(满足)

  • 以下两个条件满足一个

  • 最大并行度(2000)/最大分区器所在RDD的并行度(100)<10 (不满足)

  • 默认并行度(100)小于等于 最大分区器所在rdd的并行度(100)(满足)

因此可以沿用父RDD最大分区器,所以rdd3的分区器:Hashpartitioner,并行度100

关于是否Shuffle问题延申1

例1中rdd1.join(rdd2).reduceByKey()

问:此时reduceByKey操作会发生shuffle吗?

答案是不会,虽然reduceByKey产生的Dependency是ShuffleDependency,但是它的分区器和它父RDD的分区器是同一个分区器(沿用父RDD的分区器),所以不会有shuffle产生

关于是否Shuffle问题延申2

例1中rdd1.join(rdd2).repartition(100).reduceByKey()

问:此时reduceByKey操作会发生shuffle吗?

答案是会,按道理来说repartition(100)会产生一个Hashpartitioner(100),reduceByKey(),会沿用repartition产生的rdd的分区器,导致reduceByKey()不会产生shuffle,那为何还会有呢?

这个就是上边说的repartition算子中.values把分区器搞丢了

repartition其实是调用的coalesce(100,true)

new CoalescedRDD(new ShuffledRDD).values

也就是说,一次repartition,产生了3个rdd

  • CoalescedRDD

  • ShuffledRDD

  • MapPartitionsRDD ----.value产生的相当于map取value,这是一个窄依赖,会把RDD[T]的分区器擦除掉,因此再进行reduceByKey的时候看似父节点有分区器,其实被repartition内部的RDD转换隐形擦除了,导致shuffle产生(上述sortBy也有这个问题)

关于是否Shuffle问题延申3

问:rdd1.join(rdd2)什么情况下不发生shuffle?

当rdd1和rdd2以及join产生的新rdd采用相同的分区器,那么就不会产生shuffle

val rdd1:RDD[(String,Int)]rdda.partitionBy(new HashPartitioner(partitions 2))
val rdd2:RDD[(String,Int)]rddb.partitionBy(new HashPartitioner(partitions=2))
rdd1.join(rdd2,new HashPartitioner(partitions 2))

问:如果rdd1.join(rdd2) ,join算子不传入分区器会有shuffle吗?

不会shuffle,因为会沿用父节点的最大分区器。最终结果还是默认产生了一个Hashpartitioner(2)

总结

如果父RDD最大分区器的并行度能够承载其它父RDD的并行度(10倍以内),那就沿用父RDD的最大分区器

如果父RDD没有最大分区器或者承载不了那就new HashPartitioner(defaultParalleize)

通常父RDD只有一个,那就没这么麻烦,父RDD有分区器的直接沿用父RDD的分区器,没有就

new HashPartitioner(defaultParalleize)

但是!!!!!!!!!!

有的看似父RDD有分区器,依然没有被沿用,请注意算子内部的RDD转换导致分区器被隐形擦除了

相关内容

热门资讯

ureport知识点分享【看本... 一、简单配置一个基础报表准备个简单的excel,如下图准备好excel要展示的数据库查...
MCU上调试CAN总线问题汇总... 目录 问题一:两个can设备无法相互间收发数据 原因: 问题二ÿ...
招标 | 近期隐私计算项目招标... 开放隐私计算 1招标1、江阴智慧港口公共服务平台项目名称:江阴智慧港口公共服务平台公告...
解决网页中Mixed Cont... 在Web开发中,作为开发者我们无可避免地需要引入资源文件,或者需要发起A...
redis cluster 集... master-slave -sentinel集群master 写单点,无法扩容。 ...
Java发起同步和异步HTTP... 同步与异步概念辨析 同步(synchronous)和异步(...
Kubernetes安装与集群... 一、环境准备 1、机器环境前置条件 当前演示准备3台虚拟机环境,或者是3台阿里云服务器...
simscape仿真总结2-机... 最近用simscape进行机器人的仿真,记录和总结一下学习心得和踩过的坑。 参照B站...
Redis(一):数据结构-底... 前言 从本文开始,我将分享一下近期自学 Redis 的学习笔记,其中大部...
flask教程5:abort函... 文章目录一、abort()函数的使用1.传递状态码信息2.传递响应体消息二、自定义错误处理 app....
【玩转Jetson TX2 N... 1 VMware14 Workstation Pro安装 如果没有Ubuntu系统电脑,...
2023还有人不知道kuber... 文章目录Kubernetes(K8s)一、Openstack&VM1、**认识虚拟化****1.1*...
NOI2019模拟赛 T1牛油... 题目描述 牛油果是一种神秘的水果,其具有一个坚固程度x≥0x\geq 0x≥0...
嵌入式软件开发之Linux下C... 目录 前沿 Hello World! 编写代码 编译代码 GCC编译器  gcc 命...
云原生|Rancher与Ope... 目录一、Rancher(一)介绍(二)优点&...
如何突破卫星影像建模难点?重建... 日前,由重建大师生成的首个“珞珈三号01星”卫星影像三维模型一经发出,引...
L1-085 试试手气 L1... 我们知道一个骰子有 6 个面,分别刻了 1 到 6 个点。下面给你 6 个骰子的初始状...
SpringSecurity客... 概述 FilterChainProxy是spring-security的入口,包含默认...
数据结构--二叉树 目录1.树概念及结构1.1数的概念1.2数的表示2.二叉树概念及结构2.1二叉树的概念2.2数据结构...
Qt之QUrl和QUrlQue... QUrlQUrl 类提供了一个方便的接口使用 URLs。最常见的使用QUrl 的方式是通过构造函数来...
函数指针二三事 1 什么是函数指针? ​ 函数指针,顾名思义,它是一个指向...
[ 红队知识库 ] Windo... 🍬 博主介绍 👨‍🎓 博主介绍:大家好...
【PowerBI】PowerB... 目的: 陈述PowerBI连接Mysql数据库的坑。 方法1:直接使用【...
BI数据可视化|可自动刷新的可... BI数据可视化大屏和其他的BI报表一样,都是可用于日常的决策中,因此除了...
Linux 练习十二 (Lin... 文章目录1 计算机网络基础知识1.1 OSI参考模型和TCP/IP参考模型1.2 TCP 协议1.2...
SQL语言基础教学 | Mys... SQL语言基础教学SQL(Structured Query Languageÿ...
pandas数据分析(三) 书接pandas数据分析(二) 文章目录DataFrame数据处理与分...
DC-DC升压模块隔离高压稳压... 特点● 效率高达 80%● 2*2英寸标准封装● 单双电压输出● 价格低● 大于600V高压,稳压输...
Java【多线程基础2】 Th... 文章目录前言一、Thread类1, 构造方法2, 常用成员属性3, 常用成员方法3.1, start...
TDK| 电源——反激变压器设... 电源参数根据功率、输入输出的情况,我们选择反激电源拓扑。反激式变压器的优点有:1、 电...