1. Combiner 组件
1. 产生缘由:
Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一种组件,它的作用是在 maptask 之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输
Combiner 组件的作用:
- 减少 reduce 端的数据量
- 减少 shuffle 过程的数据量
- 在 map 端做了一次合并,提高分布式计算程序的整体性能
Combiner 组件帮 reduce 分担压力, 因此其业务逻辑和 reduce 中的业务逻辑相似
2.自定义 Combiner 组件:
默认情况下没有 Combiner 组件,Combiner 作用时间点 — map–combiner–reduce
继承 Reduce 类
- public class MyCombiner extends Reducer<前两个: map 的输出, 后两个: reduce 的输入>{}
- 我们在写 MapReduce 程序的时候, map 的输出就是 reduce 的输入
- 也就是说, 这个 MyCombiner() 的前两个泛型和后两个泛型的类型一致
重写 reduce 方法
Combiner 本质上相当于 在 map 端进行了一次 reduce 操作, 通常情况下直接使用 reducer 的类作为 Combiner 的类,不再单独写 Combiner 代码逻辑
在 Job 中加上
job.setCombinerClass(WorldcountReduce.class)
, 就会调用 Combiner
Combiner 使用原则
- 有或没有都不能影响业务逻辑,都不能影响最终结果。比如累加,最大值等,求平均值就不能用。
2、MapReduce 中的序列化
2.1、概述
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额 外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输;
Hadoop 自己开发了一套序列化机制(参与序列化的对象的类都要实现 Writable 接口),精简,高效
Java 基本类型 & Hadoop 类型对照表
1 | // Java & Hadoop类型参照 |
2.2、自定义对象实现 MapReduce 框架的序列化
要实现WritableComparable
接口,因为 MapReduce 框架中的 shuffle 过程一定会对 key 进行排序
1 | //序列化方法 |
3. MapReduce中的Sort –TODO。。
MapTask –> ReduceTask 之间, 框架默认添加了排序
排序的规则是按照Map 端输出的 key 的字典顺序进行排序
1、 如果没有重写 WritableComparable 时
按单词统计中词频出现的此处进行排序, 按照出现的次数, 从低到高
如果想要对词频进行排序, 那么词频应该放在 map 输出 key 的位置
代码实现:
1 | Map |
2、自定义排序要实现WritableComparable
接口
- 自定义的类必须放在 key 的位置
- 实现
WritableComparable
接口, 重写compareTo()
方法 - 待扩展…
作业: 增强需求: 按照总流量排序, 总流量相同时, 按照手机号码排序
4、MapReduce 中的数据分发组件 Partitioner(分区)
需求: 根据归属地输出流量统计数据结果到不同文件,以便于在查询统计结果时可以定位到 省级范围进行
思路:MapReduce 中会将 map 输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask
执行时机: 在Map输出 kv 对之后, 所携带的 k,v 参数,跟 Map 输出相同
MapReduce 默认的分发规则为:
根据 key
的 hashcode%reducetask
数来分发,所以:如果要按照我们自 己的需求进行分组,则需要改写数据分发(分区)组件 Partitioner
Partition重点总结:
Partition 的 key value, 就是Mapper输出的key value
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)。
MapReduce 中会将 map 输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask 默认的分发规则为:根据 key 的 hashcode%reducetask 数来分发,所以:如果要按照我们自 己的需求进行分组,则需要改写数据分发(分组)组件 Partitioner, 自定义一个 CustomPartitioner 继承抽象类:Partitioner
- 因此, Partitioner 的执行时机, 是在Map输出 kv 对之后
Partitioner 实现过程
- 先分析一下具体的业务逻辑,确定大概有多少个分区
- 首先书写一个类,它要继承
org.apache.hadoop.mapreduce.Partitioner
这个抽象类 - 重写
public int getPartition
这个方法,根据具体逻辑,读数据库或者配置返回相同的数字 - 在
main
方法中设置Partioner
的类,job.setPartitionerClass(DataPartitioner.class)
; - 设置
Reducer
的数量,job.setNumReduceTasks(6)
;
典型的 Partitioner 代码实现
1 | import java.util.HashMap; |
5、全局计数器
1. 框架内置计数器:
- Hadoop内置的计数器,主要用来记录作业的执行情况
- 内置计数器包括 MapReduce框架计数器(Map-Reduce Framework)
- 文件系统计数器(FielSystemCounters)
- 作业计数器(Job Counters)
- 文件输入格式计数器(File Output Format Counters)
- 文件输出格式计数器(File Input Format Counters)
- 计数器由相关的task进行维护,定期传递给tasktracker,再由tasktracker传给jobtracker;
- 最终的作业计数器实际上是有jobtracker维护,所以计数器可以被全局汇总,同时也不必在整个网络中传递
- 只有当一个作业执行成功后,最终的计数器的值才是完整可靠的;
2. 自定义的计数器
应用场景
- 用来统计运行过程中的进度和状态, 类似于 job 运行的一个报告、日志
- 要将数据处理过程中遇到的不合规数据行进行全局计数,类似这 种需求可以借助 MapReduce 框架中提供的全局计数器来实现
- 计数器的值可以在mapper或reducer中增加
使用方式
定义枚举类
1
2
3
4enum Temperature{
MISSING,
TOTAL
}在map或者reduce中使用计数器
1
2
3
4
5
6// 1.自定义计数器
Counter counter = context.getCounter(Temperature.TOTAL);
// 2.为计数器赋初始值
counter.setValue(long value);
// 3.计数器工作
counter.increment(long incr);获取计数器
1
2
3Counters counters=job.getCounters();
Counter counter=counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG);// 查找枚举计数器,假如Enum的变量为BAD_RECORDS_LONG
long value=counter.getValue();//获取计数值
计数器使用完整代码
1 | /** |
注意点:在没有 ReduceTask 的时候, job.setNumReduceTasks(0);