Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

第六章《Shuffle 机制》勘误与修改建议 #7

Open
JerryLead opened this issue Jul 20, 2020 · 8 comments
Open

第六章《Shuffle 机制》勘误与修改建议 #7

JerryLead opened this issue Jul 20, 2020 · 8 comments

Comments

@JerryLead
Copy link
Owner

No description provided.

@hangjianglaoweng
Copy link

hangjianglaoweng commented Apr 12, 2021

第171页,6.4.1 AppendOnlyMap的原理,
“如果该位置已经被占用,就使用二次探测法来找下一个空闲位置”,源码的实现应该是“线性探测法”不是“二次探测法”。
image

所以,图6.12中
第一次定位应该是Hash(K6)*2
第二次定位应该是(Hash(K6)+1)*2
第三次定位应该是(Hash(K6)+1+2)*2

如果有第四次应该是(Hash(K6)+1+2+3)*2

这里面乘以2是因为每次key和value一起放入占两个位置,如果是占一个位置,其实就是线性探测法的公式hi=(h(key)+i)%m。
不知道说的对不对,请指正。

@JerryLead
Copy link
Owner Author

@hangjianglaoweng 感谢指出,目测目前的代码实现用了线性探测法。然而,AppendOnlyMap的注释中说使用了二次探测法(This implementation uses quadratic probing with a power-of-2 hash table * size, which is guaranteed to explore all spaces for each key (see * http://en.wikipedia.org/wiki/Quadratic_probing)。想要深究的话,可以给Spark提个issue,看社区到底是想采用二次探测还是线性探测。

@MingRongXi
Copy link

MingRongXi commented Jan 1, 2023

利杰你好,我在用spark-core2.13做debug的时候,发现代码里有些流程和书里描述的不太一样
ShuffleWrite框架的设计与实现

  1. 数组的扩容机制
    "如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上"
    这里描述的流程与实际代码不符,实际代码是如果存放不下,会直接扩容为原来的二倍,在扩容完之后在把数据存进去,存完以后再判断是否需要溢写,所以这里我认为这里在扩容时会有OOM的风险;并且如果curSize=MAXIMUM_CAPACITY,则会直接抛异常
    image
  2. Map的扩容机制
    与1类似,都是扩容以后再判断是否需要溢写
  3. 需要map()端combine,需要或者不需要按Key进行排序
    “如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”
    实际上是如果不需要按Key排序,那么按照 partitionId + hash(Key) 排序,这里我猜测是为了在合并溢写文件的时候效率更高
    image

@JerryLead
Copy link
Owner Author

JerryLead commented Jan 3, 2023

@MingRongXi

第一个问题:数组的扩容机制
"如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上"
这里描述的流程与实际代码不符,实际代码是如果存放不下,会直接扩容为原来的二倍,在扩容完之后在把数据存进去,存完以后再判断是否需要溢写,所以这里我认为这里在扩容时会有OOM的风险;并且如果curSize=MAXIMUM_CAPACITY,则会直接抛异常

==> Lijie: “如果Array存放不下,则会先扩容” 指的是“如果存放不下,会扩容为原来的二倍,扩容完后可以存放新的数据”。
“如果还存放不下,就将Array中的record排序后spill到磁盘上" 指的是“如果没有扩容的空间或者存放的数据总大小超过阈值,将Array中的record排序后spill到磁盘上”。我记得Spark会定时估计Array大小,如果超过阈值就spill到磁盘,所以一般不会出现OOM。但由于是定时的粗略估计,而不是精确统计(需要统计所有对象的大小太耗时),所以在存放过程中仍可能存在OOM。当然,还有很多OOM的情况,可以参考我们之前的paper:https://jerrylead.github.io/papers/ISSRE-2015-OOM.pdf
如果你在实际运行中这里出现了OOM情况,可以报告给社区。

第二个问题:Map的扩容机制
与1类似,都是扩容以后再判断是否需要溢写
==> Lijie: 我将描述简化为“如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上"就是为了避免后续代码更改带来一些细节描述出入。本质来讲,空间不足时先扩容,不能扩容达到阈值就spill。

第三个问题:需要map()端combine,需要或者不需要按Key进行排序
“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”
实际上是如果不需要按Key排序,那么按照 partitionId + hash(Key) 排序,这里我猜测是为了在合并溢写文件的时候效率更高

=> Lijie: 书中说“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”,确实可以这样做,不需要key排序时,我们实际上没有必要对 key/hash(Key) 排序,比如如果不需要spill,直接在内存中进行combine,然后按照partition id排序后输出数据即可。如果需要spill,那么可以在spill过程中对key/hash(key)排序方便merge。这里英文里面也说"possibly second by key or hash key",也就是说在对partition id排序后,可能进行对key/hash(key)的排序,而不是一定。

@MingRongXi
Copy link

@JerryLead 感谢利杰回复,第一点和第二点是我理解的不到位,关于第三点,我看代码确认了下
=> Lijie: 书中说“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”,确实可以这样做,不需要key排序时,我们实际上没有必要对 key/hash(Key) 排序,比如如果不需要spill,直接在内存中进行combine,然后按照partition id排序后输出数据即可。如果需要spill,那么可以在spill过程中对key/hash(key)排序方便merge。这里英文里面也说"possibly second by key or hash key",也就是说在对partition id排序后,可能进行对key/hash(key)的排序,而不是一定。
=> MingRongXi:在SortShuffleWriter insert完数据后,会调用writePartitionedMapOutput方法,writePartitionedMapOutput里调用了comparator,而comparator只有在map端不需要排序且不需要聚合的时候才会返回None,如果需要map端聚合,就会使用hash(Key) 或者 Key 排序
image
image
image

@JerryLead
Copy link
Owner Author

@MingRongXi

关于第三点,
=> Lijie: 书中说“如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序”,确实可以这样做,不需要key排序时,我们实际上没有必要对 key/hash(Key) 排序,比如如果不需要spill,直接在内存中进行combine,然后按照partition id排序后输出数据即可。如果需要spill,那么可以在spill过程中对key/hash(key)排序方便merge。这里英文里面也说"possibly second by key or hash key",也就是说在对partition id排序后,可能进行对key/hash(key)的排序,而不是一定。
=> MingRongXi:在SortShuffleWriter insert完数据后,会调用writePartitionedMapOutput方法,writePartitionedMapOutput里调用了comparator,而comparator只有在map端不需要排序且不需要聚合的时候才会返回None,如果需要map端聚合,就会使用hash(Key) 或者 Key 排序
=> Lijie: 从设计角度来说,map端如果需要combine但不需要key排序,那么只需区分partition Id ,并能够按照key进行combine即可,这时有两种情况:如果内存能放大的下,不必进行key/hash(key)排序;如果内存放不下需要spill,那么可以在spill前再次对key/hash(key)排序方便merge(如书中AppendOnlyMap的图示)。当然实现时,为了方便或者统一接口,两种情况下可以都对key/hash(key)排序。本书更注重的是设计方法,开阔思路,而非完全遵照Spark现有实现,而且实现是在不断迭代优化,也许后面会有更好的实现方法。

@MingRongXi
Copy link

好的,理解了,感谢!

@qijiale76
Copy link

qijiale76 commented Dec 29, 2023

@hangjianglaoweng 感谢指出,目测目前的代码实现用了线性探测法。然而,AppendOnlyMap的注释中说使用了二次探测法(This implementation uses quadratic probing with a power-of-2 hash table * size, which is guaranteed to explore all spaces for each key (see * http://en.wikipedia.org/wiki/Quadratic_probing)。想要深究的话,可以给Spark提个issue,看社区到底是想采用二次探测还是线性探测。

参考 SPARK-4690,代码使用的是二次探测法。
第0次位置P(0)=Hash(K);
第1次探测位置P(1)=Hash(K)+1;
第2次探测位置P(2)=P(1)+2=Hash(K)+3;
递推公式为P(i+1)=P(i)+i;
通项公式为P(i+1)=Hash(K)+(n+1)*n/2;
关键点在于看通项公式的次幂而不是递推公式的次幂,因此是二次探测而不是线性探测。
书中文字和插图示例也存在问题,即第三次定位应为向后递增3个record位置,而不是4个record位置。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants