MapReduce笔记-2 三大组件-Partitioner分区,sort排序,Combiner局部分区

1. Combiner 组件

1. 产生缘由:

Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一种组件,它的作用是在 maptask 之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输

Combiner 组件的作用:

  • 减少 reduce 端的数据量
  • 减少 shuffle 过程的数据量
  • 在 map 端做了一次合并,提高分布式计算程序的整体性能

Combiner 组件帮 reduce 分担压力, 因此其业务逻辑和 reduce 中的业务逻辑相似

2.自定义 Combiner 组件:

默认情况下没有 Combiner 组件,Combiner 作用时间点 — map–combiner–reduce

  1. 继承 Reduce 类

    • public class MyCombiner extends Reducer<前两个: map 的输出, 后两个: reduce 的输入>{}
    • 我们在写 MapReduce 程序的时候, map 的输出就是 reduce 的输入
    • 也就是说, 这个 MyCombiner() 的前两个泛型和后两个泛型的类型一致
  2. 重写 reduce 方法

    • Combiner 本质上相当于 在 map 端进行了一次 reduce 操作, 通常情况下直接使用 reducer 的类作为 Combiner 的类,不再单独写 Combiner 代码逻辑

    • 在 Job 中加上job.setCombinerClass(WorldcountReduce.class), 就会调用 Combiner

  1. Combiner 使用原则

    • 有或没有都不能影响业务逻辑,都不能影响最终结果。比如累加,最大值等,求平均值就不能用。

2、MapReduce 中的序列化

2.1、概述

Java序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额 外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输

Hadoop 自己开发了一套序列化机制(参与序列化的对象的类都要实现 Writable 接口),精简,高效

Java 基本类型 & Hadoop 类型对照表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Java & Hadoop类型参照
hadoop数据类型 <------------> java数据类型:
布尔型:
BooleanWritable <------------> boolean
整型:
ByteWritable <------------> byte
ShortWritable <------------> short
IntWritable <------------> int
LongWritable <------------> long
浮点型:
FloatWritable <------------> float
DoubleWritable <------------> double
字符串(文本):
Text <------------> String
数组:
ArrayWritable <------------> Array
map集合:
MapWritable <------------> map

2.2、自定义对象实现 MapReduce 框架的序列化

要实现WritableComparable接口,因为 MapReduce 框架中的 shuffle 过程一定会对 key 进行排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeLong(upfFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

//反序列化方法
//注意: 字段的反序列化顺序与序列化时的顺序保持一致,並且类型也一致
@Override
public void readFields(DataInput in) throws IOException {
this.phone = in.readUTF();
this.upfFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}

3. MapReduce中的Sort –TODO。。

MapTask –> ReduceTask 之间, 框架默认添加了排序

排序的规则是按照Map 端输出的 key 的字典顺序进行排序

1、 如果没有重写 WritableComparable 时

 按单词统计中词频出现的此处进行排序, 按照出现的次数, 从低到高

如果想要对词频进行排序, 那么词频应该放在 map 输出 key 的位置

代码实现

1
2
3
4
5
6
7
 Map 
//词频为 key, 其它为 value

 Reduce 
// 将 map 输入的结果反转(k,v 换位置), 输出最终结果
// 最后输出还是按照左边词, 右边次数
// ps: 如果倒序排的时候, map 的时候发的时候 加上-, reduce 发的时候, 再加上-, 转成 IntWritable
2、自定义排序要实现WritableComparable接口
  • 自定义的类必须放在 key 的位置
  • 实现WritableComparable接口, 重写 compareTo()方法
  • 待扩展…

作业: 增强需求: 按照总流量排序, 总流量相同时, 按照手机号码排序


4、MapReduce 中的数据分发组件 Partitioner(分区)

需求: 根据归属地输出流量统计数据结果到不同文件,以便于在查询统计结果时可以定位到 省级范围进行

思路:MapReduce 中会将 map 输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask

执行时机: 在Map输出 kv 对之后, 所携带的 k,v 参数,跟 Map 输出相同


MapReduce 默认的分发规则为

根据 keyhashcode%reducetask 数来分发,所以:如果要按照我们自 己的需求进行分组,则需要改写数据分发(分区)组件 Partitioner


Partition重点总结:

  • Partition 的 key value, 就是Mapper输出的key value

    public abstract int getPartition(KEY key, VALUE value, int numPartitions);

    输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)

  • MapReduce 中会将 map 输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask 默认的分发规则为:根据 key 的 hashcode%reducetask 数来分发,所以:如果要按照我们自 己的需求进行分组,则需要改写数据分发(分组)组件 Partitioner, 自定义一个 CustomPartitioner 继承抽象类:Partitioner

  • 因此, Partitioner 的执行时机, 是在Map输出 kv 对之后
Partitioner 实现过程
  1. 先分析一下具体的业务逻辑,确定大概有多少个分区
  2. 首先书写一个类,它要继承 org.apache.hadoop.mapreduce.Partitioner这个抽象类
  3. 重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字
  4. main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);
  5. 设置Reducer的数量,job.setNumReduceTasks(6);

典型的 Partitioner 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
private static HashMap<String, Integer> provincMap = new HashMap<String, Integer>();
static {
provincMap.put("138", 0);
provincMap.put("139", 1);
provincMap.put("136", 2);
provincMap.put("137", 3);
provincMap.put("135", 4);
}

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
Integer code = provincMap.get(key.toString().substring(0, 3));
if (code != null) {
return code;
}
return 5;
}
}

5、全局计数器

1. 框架内置计数器:

  • Hadoop内置的计数器,主要用来记录作业的执行情况
  • 内置计数器包括 MapReduce框架计数器(Map-Reduce Framework)
    • 文件系统计数器(FielSystemCounters)
    • 作业计数器(Job Counters)
    • 文件输入格式计数器(File Output Format Counters)
    • 文件输出格式计数器(File Input Format Counters)
  • 计数器由相关的task进行维护,定期传递给tasktracker,再由tasktracker传给jobtracker;
  • 最终的作业计数器实际上是有jobtracker维护,所以计数器可以被全局汇总,同时也不必在整个网络中传递
  • 只有当一个作业执行成功后,最终的计数器的值才是完整可靠的;

2. 自定义的计数器

应用场景
  • 用来统计运行过程中的进度和状态, 类似于 job 运行的一个报告、日志
  • 要将数据处理过程中遇到的不合规数据行进行全局计数,类似这 种需求可以借助 MapReduce 框架中提供的全局计数器来实现
  • 计数器的值可以在mapper或reducer中增加

使用方式

  1. 定义枚举类

    1
    2
    3
    4
    enum Temperature{  
    MISSING,
    TOTAL
    }
  2. 在map或者reduce中使用计数器

    1
    2
    3
    4
    5
    6
    // 1.自定义计数器
    Counter counter = context.getCounter(Temperature.TOTAL);
    // 2.为计数器赋初始值
    counter.setValue(long value);
    // 3.计数器工作
    counter.increment(long incr);
  3. 获取计数器

    1
    2
    3
    Counters counters=job.getCounters(); 
    Counter counter=counters.findCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG);// 查找枚举计数器,假如Enum的变量为BAD_RECORDS_LONG
    long value=counter.getValue();//获取计数值

计数器使用完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/** 
* @Description 假如一个文件,规范的格式是3个字段,“\t”作为分隔符,其中有2条异常数据,一条数据是只有2个字段,一条数据是有4个字段
*/
public class MyCounter {
// \t键
private static String TAB_SEPARATOR = "\t";

public static class MyCounterMap extends
Mapper<LongWritable, Text, Text, Text> {
// 定义枚举对象
public static enum LOG_PROCESSOR_COUNTER {
BAD_RECORDS_LONG, BAD_RECORDS_SHORT
};

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String arr_value[] = value.toString().split(TAB_SEPARATOR);
if (arr_value.length > 3) {
/* 自定义计数器 */
context.getCounter("ErrorCounter", "toolong").increment(1);
/* 枚举计数器 */
context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_LONG).increment(1);
} else if (arr_value.length < 3) {
// 自定义计数器
context.getCounter("ErrorCounter", "tooshort").increment(1);
// 枚举计数器
context.getCounter(LOG_PROCESSOR_COUNTER.BAD_RECORDS_SHORT).increment(1);
}
}
}

@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
String[] args0 = {
"hdfs://hadoop2:9000/buaa/counter/counter.txt",
"hdfs://hadoop2:9000/buaa/counter/out/"
};
// 读取配置文件
Configuration conf = new Configuration();

// 如果输出目录存在,则删除
Path mypath = new Path(args0[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}

// 新建一个任务
Job job = new Job(conf, "MyCounter");
// 主类
job.setJarByClass(MyCounter.class);
// Mapper
job.setMapperClass(MyCounterMap.class);

// 输入目录
FileInputFormat.addInputPath(job, new Path(args0[0]));
// 输出目录
FileOutputFormat.setOutputPath(job, new Path(args0[1]));

// 提交任务,并退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

注意点:在没有 ReduceTask 的时候, job.setNumReduceTasks(0);

关于计数器,详情可参考


如果帮到你, 可以给我赞助杯咖啡☕️
0%