如何进行Spark Shuffle实现

12次阅读
没有评论

本篇文章为大家展示了如何进行 Spark Shuffle 实现,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

对于大数据计算框架而言,Shuffle 阶段的设计优劣是决定性能好坏的关键因素之一。丸趣 TV 小编将介绍目前 Spark 的 shuffle 实现,并将之与 MapReduce 进行简单对比。

(1) shuffle 基本概念与常见实现方式

shuffle,是一个算子,表达的是多对多的依赖关系,在类 MapReduce 计算框架中,是连接 Map 阶段和 Reduce 阶段的纽带,即每个 Reduce Task 从每个 Map Task 产生数的据中读取一片数据,极限情况下可能触发 M * R 个数据拷贝通道(M 是 Map Task 数目,R 是 Reduce Task 数目)。通常 shuffle 分为两部分:Map 阶段的数据准备和 Reduce 阶段的数据拷贝。首先,Map 阶段需根据 Reduce 阶段的 Task 数量决定每个 Map Task 输出的数据分片数目,有多种方式存放这些数据分片:

1)保存在内存中或者磁盘上(Spark 和 MapReduce 都存放在磁盘上);

2)每个分片一个文件(现在 Spark 采用的方式,若干年前 MapReduce 采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在 MapReduce 采用的方式)。

在 Map 端,不同的数据存放方式各有优缺点和适用场景。一般而言,shuffle 在 Map 端的数据要存储到磁盘上,以防止容错触发重算带来的庞大开销(如果保存到 Reduce 端内存中,一旦 Reduce Task 挂掉了,所有 Map Task 需要重算)。但数据在磁盘上存放方式有多种可选方案,在 MapReduce 前期设计中,采用了现在 Spark 的方案(目前一直在改进),每个 Map Task 为每个 Reduce Task 产生一个文件,该文件只保存特定 Reduce Task 需处理的数据,这样会产生 M * R 个文件,如果 M 和 R 非常庞大,比如均为 1000,则会产生 100w 个文件,产生和读取这些文件会产生大量的随机 IO,效率非常低下。解决这个问题的一种直观方法是减少文件数目,常用的方法有:

        1) 将一个节点上所有 Map 产生的文件合并成一个大文件(MapReduce 现在采用的方案),

        2) 每个节点产生 {(slot 数目)*R} 个文件(Spark 优化后的方案)。对后面这种方案简单解释一下:不管是 MapReduce 1.0 还是 Spark,每个节点的资源会被抽象成若干个 slot,由于一个 Task 占用一个 slot,因此 slot 数目可看成是最多同时运行的 Task 数目。如果一个 Job 的 Task 数目非常多,限于 slot 数目有限,可能需要运行若干轮。这样,只需要由第一轮产生 {(slot 数目)*R} 个文件,后续几轮产生的数据追加到这些文件末尾即可。

        因此,后一种方案可减少大作业产生的文件数目。

        在 Reduce 端,各个 Task 会并发启动多个线程同时从多个 Map Task 端拉取数据。由于 Reduce 阶段的主要任务是对数据进行按组规约。

        也就是说,需要将数据分成若干组,以便以组为单位进行处理。大家知道,分组的方式非常多,常见的有:Map/HashTable(key 相同的,放到同一个 value list 中)和 Sort(按 key 进行排序,key 相同的一组,经排序后会挨在一起),这两种方式各有优缺点,第一种复杂度低,效率高,但是需要将数据全部放到内存中,第二种方案复杂度高,但能够借助磁盘(外部排序)处理庞大的数据集。Spark 前期采用了第一种方案,而在最新的版本中加入了第二种方案,MapReduce 则从一开始就选用了基于 sort 的方案。

(2)MapReduce Shuffle 发展史

【阶段 1】:MapReduce Shuffle 的发展也并不是一马平川的,刚开始(0.10.0 版本之前)采用了“每个 Map Task 产生 R 个文件”的方案,前面提到,该方案会产生大量的随机读写 IO,对于大数据处理而言,非常不利。

【阶段 2】:为了避免 Map Task 产生大量文件,HADOOP-331 尝试对该方案进行优化,优化方法:为每个 Map Task 提供一个环形 buffer,一旦 buffer 满了后,则将内存数据 spill 到磁盘上(外加一个索引文件,保存每个 partition 的偏移量),最终合并产生的这些 spill 文件,同时创建一个索引文件,保存每个 partition 的偏移量。

(阶段 2):这个阶段并没有对 shuffle 架构做调成,只是对 shuffle 的环形 buffer 进行了优化。在 Hadoop 2.0 版本之前,对 MapReduce 作业进行参数调优时,Map 阶段的 buffer 调优非常复杂的,涉及到多个参数,这是由于 buffer 被切分成两部分使用:一部分保存索引(比如 parition、key 和 value 偏移量和长度),一部分保存实际的数据,这两段 buffer 均会影响 spill 文件数目,因此,需要根据数据特点对多个参数进行调优,非常繁琐。而 MAPREDUCE-64 则解决了该问题,该方案让索引和数据共享一个环形缓冲区,不再将其分成两部分独立使用,这样只需设置一个参数控制 spill 频率。

【阶段 3(进行中)】:目前 shuffle 被当做一个子阶段被嵌到 Reduce 阶段中的。由于 MapReduce 模型中,Map Task 和 Reduce Task 可以同时运行,因此一个作业前期启动的 Reduce Task 将一直处于 shuffle 阶段,直到所有 Map Task 运行完成,而在这个过程中,Reduce Task 占用着资源,但这部分资源利用率非常低,基本上只使用了 IO 资源。为了提高资源利用率,一种非常好的方法是将 shuffle 从 Reduce 阶段中独立处理,变成一个独立的阶段 / 服务,由专门的 shuffler service 负责数据拷贝,目前百度已经实现了该功能(准备开源?),且收益明显,具体参考:MAPREDUCE-2354。

(3)Spark Shuffle 发展史

目前看来,Spark Shuffle 的发展史与 MapReduce 发展史非常类似。初期 Spark 在 Map 阶段采用了“每个 Map Task 产生 R 个文件”的方法,在 Reduce 阶段采用了 map 分组方法,但随 Spark 变得流行,用户逐渐发现这种方案在处理大数据时存在严重瓶颈问题,因此尝试对 Spark 进行优化和改进,相关链接有:External Sorting for Aggregator and CoGroupedRDDs,“Optimizing Shuffle Performance in Spark”,“Consolidating Shuffle Files in Spark”,优化动机和思路与 MapReduce 非常类似。

Spark 在前期设计中过多依赖于内存,使得一些运行在 MapReduce 之上的大作业难以直接运行在 Spark 之上(可能遇到 OOM 问题)。目前 Spark 在处理大数据集方面尚不完善,用户需根据作业特点选择性的将一部分作业迁移到 Spark 上,而不是整体迁移。随着 Spark 的完善,很多内部关键模块的设计思路将变得与 MapReduce 升级版 Tez 非常类似。

上述内容就是如何进行 Spark Shuffle 实现,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注丸趣 TV 行业资讯频道。