ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Spark2.x精通:源码剖析BypassMergeSortShuffleWriter具体实现

2021-03-10 09:51:37  阅读:223  来源: 互联网

标签:文件 shuffle BypassMergeSortShuffleWriter Spark2 分区 mapId 源码 磁盘 临时文件


一、概述


    上篇文章:Spark2.x精通:三种ShuffleWriter触发条件,我们讲了ShuffleHandle如何选择不同的ShuffleWrite策略,这里我们从源码角度剖析BypassMergeSortShuffleWriter实现策略的原理和具体的实现细节。


    BypassMergeSortShuffleWriter具体的实现都在对应类的write()函数中,我们直接看源码进行剖析

   

1.先看构造函数初始化

 BypassMergeSortShuffleWriter(      BlockManager blockManager,      IndexShuffleBlockResolver shuffleBlockResolver,      BypassMergeSortShuffleHandle<K, V> handle,      int mapId,      TaskContext taskContext,      SparkConf conf) {    // 获取spark.shuffle.file.buffer参数值,默认32k,这里是一个比较重要的条有参数,    // 该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。    // 将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘    //如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,    // 也就可以减少磁盘IO次数,进而提升性能    this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;   // 是否采用NIO的从文件到文件流的复制方式,默认值是true 一般不用修改    this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);    this.blockManager = blockManager;    // 获取shufflehandle中的ShuffleDependency对象,通过该对象得到分区器和分区个数等数据。    final ShuffleDependency<K, V, V> dep = handle.dependency();    this.mapId = mapId;    this.shuffleId = dep.shuffleId();    this.partitioner = dep.partitioner();    this.numPartitions = partitioner.numPartitions();    this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();    //设置序列化工具对象,和shuffleBlockResolver对象,    // 该对象用来创建和维护shuffle的数据的逻辑块和物理文件位置之间的映射的对象    this.serializer = dep.serializer();    this.shuffleBlockResolver = shuffleBlockResolver;  }

    

2.再看write()函数,源码如下:

   //这里大体意思是 为每个分区在磁盘创建临时文件  并给每一个writer


上面代码的大体思路如下:


a.确定分区数,然后为每个分区创建DiskBlockObjectWriter和临时文件


b.循环将record通过Partitioner进行分区,并写入对应分区临时文件


c. 将分区数据刷到磁盘


d.根据shuffleId和mapId,构建ShuffleDataBlockId,创建合并文件data和合并文件的临时文件,文件格式为:

shuffle_{shuffleId}_{mapId}_{reduceId}.data


e.将分区文件合并到一个总的临时文件,合并后会重命名为最终输出文件名,并返回一个对应分区文件长度的数组


f.创建索引文件index和索引临时文件,每一个分区的长度和offset写入索引文件等;并且重命名临时data文件和临时index文件


g.将一些信息封装到MapStatus返回

    

存在问题:


    这种Writer会为每个分区创建一个临时文件,如果分区过多时,会创建很多的output输出流和临时文件对象,占用资源过多,性能会下降。


标签:文件,shuffle,BypassMergeSortShuffleWriter,Spark2,分区,mapId,源码,磁盘,临时文件
来源: https://blog.51cto.com/15080019/2653915

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有