MapReduce笔记-1 WordCount, MapReduce运行机制

参考链接:

hdfs 笔记

mapreduce 笔记

1、MapReduce 入门

1.1、MapReduce概念

hadoop 的四大组件:

  • HDFS:分布式存储系统
  • MapReduce:分布式计算系统
  • YARN:hadoop 的资源调度系统
  • Common:以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用” 的核心框架

MapReduce 核心功能将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布 式运算程序并发运行在一个 Hadoop 集群

1.2、为什么需要 MapReduce?

引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理

Hadoop 当中的 MapReduce 分布式程序运算框架整体结构如下:

MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行

MapTask:阶段并发任,负责 mapper 阶段的任务处理

YARNChild

ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理

YARNChild

1.3、MapReduce 的编写规范

MapReduce 程序编写规范:

  1. 用户编写的程序分成三个部分Mapper,Reducer,Driver(提交运行 MR 程序的客户端)
  2. Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
  3. Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
  4. Mapper 中的业务逻辑写在 map()方法中
  5. map()方法(maptask 进程)对每一个<K,V>调用一次
  6. Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式
  7. Reducer 的业务逻辑写在 reduce()方法中
  8. Reducetask 进程对每一组相同 k 的<K,V>组调用一次 reduce()方法
  9. 用户自定义的 Mapper 和 Reducer 都要继承各自的父类
  10. 整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象

1.4、WordCount 程序

1、业务逻辑

  1. maptask阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成一个 key-value对,比如单词 hello,就转换成<’hello’,1>发送给 reducetask去汇总

  2. reducetask阶段将接受 maptask的结果,来做汇总计数

2、具体代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
 Map 
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 计算任务代码:切割单词,输出每个单词计 1 的 key-value 对
String[] words = value.toString().split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}


 Reduce 
static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 汇总计算代码:对每个 key 相同的一组 key-value 做汇总统计
int sum = 0;
for (IntWritable v : values) {
sum += v.get();
}
context.write(key, new IntWritable(sum));
}
}


 main 

public static void main(String[] args) throws Exception {
// 指定 hdfs 相关的参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");

// 新建一个 job 任务
Job job = Job.getInstance(conf);

// 设置 jar 包所在路径
job.setJarByClass(WordCountMR.class);

// 指定 mapper 类和 reducer 类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 指定 maptask 的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 指定 reducetask 的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 指定该 mapreduce 程序数据的输入和输出路径
Path inputPath = new Path("/wordcount/input");
Path outputPath = new Path("/wordcount/output");
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);

// 最后提交任务
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:1);
}

1.5 小文件优化

image-20180617203857037

需要设置最大值和最小值, 会切分文件, 在最大值与最小值之间

2、MapReduce 程序的核心运行机制

2.1、概述

一个完整的 MapReduce 程序在分布式运行时有两类实例进程:

  1. MRAppMaster:负责整个程序的过程调度及状态协调
  2. Yarnchild:负责 map 阶段的整个数据处理流程
  3. Yarnchild:负责 reduce 阶段的整个数据处理流程

以上两个阶段 MapTask 和 ReduceTask 的进程都是 YarnChild , 并不是说这 MapTask 和 ReduceTask 就跑在同一个 YarnChild 进行里

2.2、MapReduce 程序的运行流程

  1. 一个 mr 程序启动的时候,最先启动的是 MRAppMaster,MRAppMaster 启动后根据本次 job 的描述信息,计算出需要的 maptask 实例数量,然后向集群申请机器启动相应数量的 maptask 进程
  2. maptask 进程启动之后,根据给定的数据切片(哪个文件的哪个偏移量范围)范围进行数 据处理,主体流程为:
    • 利用客户指定的 InputFormat 来获取 RecordReader 读取数据,形成输入 KV 对
    • 将输入 KV 对传递给客户定义的 map()方法,做逻辑运算,并将 map()方法输出的 KV 对收 集到缓存
    • 将缓存中的 KV 对按照 K 分区排序后不断溢写到磁盘文件
  3. MRAppMaster 监控到所有 maptask 进程任务完成之后(真实情况是,某些 maptask 进 程处理完成后,就会开始启动 reducetask 去已完成的 maptask 处 fetch 数据),会根据客户指 定的参数启动相应数量的 reducetask 进程,并告知 reducetask 进程要处理的数据范围(数据 分区)
  4. Reducetask 进程启动之后,根据 MRAppMaster 告知的待处理数据所在位置,从若干台 maptask 运行所在机器上获取到若干个 maptask 输出结果文件,并在本地进行重新归并排序, 然后按照相同 key 的 KV 为一个组,调用客户定义的 reduce()方法进行逻辑运算,并收集运 算输出的结果 KV,然后调用客户指定的 OutputFormat 将结果数据输出到外部存储

2.3、MapTask 并行度决定机制

将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多 个 split),然后每一个 split 分配一个 mapTask 并行实例处理

这段逻辑及形成的切片规划描述文件,是由 FileInputFormat 实现类的 getSplits()方法完成的。 该方法返回的是 List,InputSplit 封装了每一个逻辑切片的信息,包括长度和位置 信息,而 getSplits()方法返回一组 InputSplit。

2.4、切片机制

FileInputFormat 中默认的切片机制

  1. 简单地按照文件的内容长度进行切片
  2. 切片大小,默认等于 block 大小
  3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:

File1.txt 200M

File2.txt 100M

经过 getSplits()方法处理之后,形成的切片信息是:

File1.txt-split1 0-128M

File1.txt-split2 129M-200M

File2.txt-split1 0-100M

FileInputFormat 中切片的大小的参数配置

1
2
3
4
5
6
7
8
9
10
11
// 通过分析源码,在 FileInputFormat 中,计算切片大小的逻辑:
long splitSize = computeSplitSize(blockSize, minSize, maxSize),翻译一下就是求这三个值的中 间值

// 切片主要由这几个值来运算决定:
blocksize:默认是 128M,可通过 dfs.blocksize 修改
minSize:默认是 1,可通过 mapreduce.input.fileinputformat.split.minsize 修改
maxsize:默认是 Long.MaxValue,可通过 mapreduce.input.fileinputformat.split.maxsize 修改

//因此,如果 maxsize 调的比 blocksize 小,则切片会小于 blocksize; 如果 minsize 调的比 blocksize 大,则切片会大于 blocksize

// 但是,不论怎么调参数,都不能让多个小文件“划入”一个 split

2.5、MapTask 并行度经验之谈

如果硬件配置为 2*12core + 64G,恰当的 map 并行度是大约每个节点 20-100 个 map,最好 每个 map 的执行时间至少一分钟。

  1. 如果 job 的每个 map 或者 reduce task 的运行时间都只有 30-40 秒钟,那么就减少该 job 的 map 或者 reduce 数,每一个 task(map|reduce)的 setup 和加入到调度器中进行调度,这个 中间的过程可能都要花费几秒钟,所以如果每个 task 都非常快就跑完了,就会在 task 的开 始和结束的时候浪费太多的时间。

配置 task 的 JVM 重用可以改善该问题:

  • mapred.job.reuse.jvm.num.tasks,默认是 1,表示一个 JVM 上最多可以顺序执行的 task 数目(属于同一个 Job)是 1。也就是说一个 task 启一个 JVM。
  • 这个值可以在 mapred-site.xml 中进行更改,当设置成多个,就意味着这多个 task 运行在同一个 JVM 上,但不是同时执行, 是排队顺序执行
  1. 如果 input 的文件非常的大,比如 1TB,可以考虑将 hdfs 上的每个 blocksize 设大,比如 设成 256MB 或者 512MB

2.6、ReduceTask 并行度决定机制

reducetask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由 切片数决定不同,Reducetask 数量的决定是可以直接手动设置:

1
2
// 设置 ReduceTask 的并行度
job.setNumReduceTasks(4);

默认值是 1,

手动设置为 4,表示运行 4 个 reduceTask,

设置为 0,表示不运行 reduceTask 任务,也就是没有 reducer 阶段,只有 mapper 阶段


如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜

注意:reducetask 数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有 1 个 reducetask

尽量不要运行太多的 reducetask。对大多数 job 来说,最好 rduce 的个数最多和集群中的 reduce 持平,或者比集群的 reduce slots 小。这个对于小集群而言,尤其重要。

最好的ReduceTask 个数是:datanode 个数 * 0.75~0.95 左右

2.7 客户端提交 MR 程序 Job 的流程


3. 昨日复习

1.MapReduce 的 wc 编程

  • 手写代码
    • Mapper
    • Reducer
    • Driver

2.MapTask 的并行度

  • 在程序执行的时候运行的 maptask 的总个数

3.ReduceTask的并行度问题

  • ReduceTask 的并行度设置依赖于自己传入的参数
  • 一般经验: ReduceTask 的个数应该 = datanode 的阶段数 * (0.75~0.95)
  • ReduceTask 在设置的时候的并行度有一定的瓶颈
  • 分区: 决定 ReduceTask 中的数据怎么分配的
    • 默认分区方式
    • 自定义分区
如果帮到你, 可以给我赞助杯咖啡☕️
0%