参考链接:
1、MapReduce 入门
1.1、MapReduce概念
hadoop 的四大组件:
- HDFS:分布式存储系统
- MapReduce:分布式计算系统
- YARN:hadoop 的资源调度系统
- Common:以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等
MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用” 的核心框架
MapReduce 核心功能 :将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布 式运算程序,并发运行在一个 Hadoop 集群上
1.2、为什么需要 MapReduce?
引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理
Hadoop 当中的 MapReduce 分布式程序运算框架的整体结构如下:
MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行
MapTask:阶段并发任,负责 mapper 阶段的任务处理
YARNChild
ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理
YARNChild
1.3、MapReduce 的编写规范
MapReduce 程序编写规范:
- 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)
- Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
- Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
- Mapper 中的业务逻辑写在 map()方法中
- map()方法(maptask 进程)对每一个<K,V>调用一次
- Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式
- Reducer 的业务逻辑写在 reduce()方法中
- Reducetask 进程对每一组相同 k 的<K,V>组调用一次 reduce()方法
- 用户自定义的 Mapper 和 Reducer 都要继承各自的父类
- 整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象
1.4、WordCount 程序
1、业务逻辑
maptask阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成一个 key-value对,比如单词 hello,就转换成<’hello’,1>发送给 reducetask去汇总
reducetask阶段将接受 maptask的结果,来做汇总计数
2、具体代码实现
1 | Map |
1.5 小文件优化

需要设置最大值和最小值, 会切分文件, 在最大值与最小值之间
2、MapReduce 程序的核心运行机制
2.1、概述
一个完整的 MapReduce 程序在分布式运行时有两类实例进程:
- MRAppMaster:负责整个程序的过程调度及状态协调
- Yarnchild:负责 map 阶段的整个数据处理流程
- Yarnchild:负责 reduce 阶段的整个数据处理流程
以上两个阶段 MapTask 和 ReduceTask 的进程都是 YarnChild , 并不是说这 MapTask 和 ReduceTask 就跑在同一个 YarnChild 进行里
2.2、MapReduce 程序的运行流程
- 一个 mr 程序启动的时候,最先启动的是 MRAppMaster,MRAppMaster 启动后根据本次 job 的描述信息,计算出需要的 maptask 实例数量,然后向集群申请机器启动相应数量的 maptask 进程
- maptask 进程启动之后,根据给定的数据切片(哪个文件的哪个偏移量范围)范围进行数 据处理,主体流程为:
- 利用客户指定的 InputFormat 来获取 RecordReader 读取数据,形成输入 KV 对
- 将输入 KV 对传递给客户定义的 map()方法,做逻辑运算,并将 map()方法输出的 KV 对收 集到缓存
- 将缓存中的 KV 对按照 K 分区排序后不断溢写到磁盘文件
- MRAppMaster 监控到所有 maptask 进程任务完成之后(真实情况是,某些 maptask 进 程处理完成后,就会开始启动 reducetask 去已完成的 maptask 处 fetch 数据),会根据客户指 定的参数启动相应数量的 reducetask 进程,并告知 reducetask 进程要处理的数据范围(数据 分区)
- Reducetask 进程启动之后,根据 MRAppMaster 告知的待处理数据所在位置,从若干台 maptask 运行所在机器上获取到若干个 maptask 输出结果文件,并在本地进行重新归并排序, 然后按照相同 key 的 KV 为一个组,调用客户定义的 reduce()方法进行逻辑运算,并收集运 算输出的结果 KV,然后调用客户指定的 OutputFormat 将结果数据输出到外部存储
2.3、MapTask 并行度决定机制
将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多 个 split),然后每一个 split 分配一个 mapTask 并行实例处理。
这段逻辑及形成的切片规划描述文件,是由 FileInputFormat 实现类的 getSplits()方法完成的。 该方法返回的是 List
2.4、切片机制

FileInputFormat 中默认的切片机制
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于 block 大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
比如待处理数据有两个文件:
File1.txt 200M
File2.txt 100M
经过 getSplits()方法处理之后,形成的切片信息是:
File1.txt-split1 0-128M
File1.txt-split2 129M-200M
File2.txt-split1 0-100M
FileInputFormat 中切片的大小的参数配置
1 | // 通过分析源码,在 FileInputFormat 中,计算切片大小的逻辑: |
2.5、MapTask 并行度经验之谈
如果硬件配置为 2*12core + 64G,恰当的 map 并行度是大约每个节点 20-100 个 map,最好 每个 map 的执行时间至少一分钟。
- 如果 job 的每个 map 或者 reduce task 的运行时间都只有 30-40 秒钟,那么就减少该 job 的 map 或者 reduce 数,每一个 task(map|reduce)的 setup 和加入到调度器中进行调度,这个 中间的过程可能都要花费几秒钟,所以如果每个 task 都非常快就跑完了,就会在 task 的开 始和结束的时候浪费太多的时间。
配置 task 的 JVM 重用可以改善该问题:
- mapred.job.reuse.jvm.num.tasks,默认是 1,表示一个 JVM 上最多可以顺序执行的 task 数目(属于同一个 Job)是 1。也就是说一个 task 启一个 JVM。
- 这个值可以在 mapred-site.xml 中进行更改,当设置成多个,就意味着这多个 task 运行在同一个 JVM 上,但不是同时执行, 是排队顺序执行
- 如果 input 的文件非常的大,比如 1TB,可以考虑将 hdfs 上的每个 blocksize 设大,比如 设成 256MB 或者 512MB
2.6、ReduceTask 并行度决定机制
reducetask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由 切片数决定不同,Reducetask 数量的决定是可以直接手动设置:
1 | // 设置 ReduceTask 的并行度 |
默认值是 1,
手动设置为 4,表示运行 4 个 reduceTask,
设置为 0,表示不运行 reduceTask 任务,也就是没有 reducer 阶段,只有 mapper 阶段
如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜
注意:reducetask 数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有 1 个 reducetask
尽量不要运行太多的 reducetask。对大多数 job 来说,最好 rduce 的个数最多和集群中的 reduce 持平,或者比集群的 reduce slots 小。这个对于小集群而言,尤其重要。
最好的ReduceTask 个数是:datanode 个数 * 0.75~0.95 左右
2.7 客户端提交 MR 程序 Job 的流程

3. 昨日复习
1.MapReduce 的 wc 编程
- 手写代码
- Mapper
- Reducer
- Driver
2.MapTask 的并行度
- 在程序执行的时候运行的 maptask 的总个数
3.ReduceTask的并行度问题
- ReduceTask 的并行度设置依赖于自己传入的参数
- 一般经验: ReduceTask 的个数应该 = datanode 的阶段数 * (0.75~0.95)
- ReduceTask 在设置的时候的并行度有一定的瓶颈
- 分区: 决定 ReduceTask 中的数据怎么分配的
- 默认分区方式
- 自定义分区