MapReduce笔记-3

1.多 Job 串联

1.概念

当程序中有多个 Job, 并且多个 job 之间相互依赖, a , job 需要依赖另一个b,job 的执行结果时候, 此时需要使用多 job 串联

2. 涉及到昨天的微博求共同粉丝题目

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K

以上是数据:
A:B,C,D,F,E,O
表示:A用户 关注B,C,D,E,F,O

求所有两两用户之间的共同关注对象

注意

  • 要写2个MapReduce, 开启2个job
  • 后一个job依赖于前一个的执行结果
  • 后一个job的输入文件路径,就是前一个job的输出路径
  • 2个job需要添加依赖

代码

参考练习-第一题-求微博共同好友

多 Job 串联部分代码

  • 基本的写到一起, job1, job2

  • JobControl对象管理多 job, 会将多个 job 当做一个组中的 job 提交, 参数指的是组名, 随意起

  • 原生的 job 要转为可控制的 job

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // 创建 JobControl 组
    JobControl jc = new JobControl("common_friend");
    // job 拿好配置, 加入 ControlledJob 管理, 变成可控制的 job
    ControlledJob ajob = new ControlledJob(job1.getConfiguration());
    ControlledJob bjob = new ControlledJob(job2.getConfiguration());
    // 添加依赖关系
    bjob.addDependingJob(ajob);
    // 添加 job进 JC
    jc.addJob(ajob);
    jc.addJob(bjob);
    // 启动线程
    Thread jobControlTread = new Thread(jc);
    jobControlTread.start();
    // 在线程完成之后关闭
    while(!jc.allFinished()) {
    Thread.sleep(500);
    }
    jobControl.stop();

2. 分组组件

map–分组–reduce

reduce 接收到的数据是按照 map 输出的 key 进行分组的, 分组的时候按照 key 相同的时候为一组, 默认都实现了 WritableComparable接口, 其中的 compareTo()方法返回为0的时候 默认为一组, 返回不为0, 则分到下一组


自定义分组使用场景: 默认的数据分组不能满足需求

一、数据解释

数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数

二、统计需求:
1、统计每门课程的参考人数和课程平均分

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件

3、求出每门课程参考学生成绩最高平均分的学生的信息:课程,姓名和平均分

第三题: 要求就是分组求最大值, 两件事情: 分组, 排序(shuffle)

总结:

1、利用“班级和平均分”作为 key,可以将 map 阶段读取到的所有学生成绩数据按照班级 和成绩排倒序,发送到 reduce

2、在 reduce 端利用 GroupingComparator 将班级相同的 kv 聚合成组,然后取第一个即是最 大值

具体参考:

练习-求学生成绩-第三小题


3. Reduce 中的2个坑

坑1

Iterable\只能循环遍历一次

迭代器每次循环遍历完成, 指针都会移动到最后一个

系统类型,没事

自定义类型 ,有问题?

坑2

迭代器中所有对象公用同一个地址


4. Reduce 端的 Join

牺牲效率换执行

思路:

核心: 关联条件

  • 想要在 reduce 端完成 join, 要在 reduce 端可以同时接收到两个表中的数据
  • 要保证在 Map 端进行读文件的时候, 读到2个表的数据, 并且需要对2个表的数据进行区分
  • 将2个表放在同一个目录下

Map 端

  • 读取两个表中的数据, 进行切分、发送
  • key : 公共字段–关联字段–pid
  • value: 剩下的字段, 标记数据的来源表

Reduce 端

  • 通过编辑分离出2个表的数据
  • 分别存到2个容器中(ArrayList)
  • 遍历大表,拼接小表

代码

参考练习-第三题MR实现2个表之间的Join

缺陷

1. ReduceTask 的并行度问题:

  • 建议0.95*datanode 的个数
  • 并行度不高, 性能不高

2. 容器性能

  • list 等, 不提倡, reduce 接收的数据, 可能会很大

3. ReduceTask 容易产生数据倾斜

  • 假设我们设置多个 ReduceTask, 根据分区规则, 默认 hash
  • 以 key关联条件分, ReduceTask数据倾斜, 每个 ReduceTask 分工不均, 非常影响性能,没有合理的利用集群资源
  • 在真实的生产中一定要尽量的避免数据倾斜
  • 最好的做法:将分区设计的足够完美,难度比较大
  • 因此,ReduceTask 一般不会完成 John工作
  • 放在 Map 端完成就不会有这个问题了

补充:Mapper 中的源码分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run(Context context) throws IOException, InterruptedException {
// 在 maptask 执行之前调用一次, 一个 maptask 只会调用一次。setup 中通常会帮 map 中初始化一些变量和资源, 比如数据库的连接等。
// 主要目的:减少资源的初始化次数而提升程序的性能
setup(context);
try {
// 获取文件是否还有下一行, 一行只调用一次
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
// maptask 任务执行完成之后会调用一次,一个 maptask 只会调用一次
// 帮 map 处理一些善后工作, 比如:资源的关闭
cleanup(context);
}
}

5. Map 端的 Join

注意点:这种方式只能通过 Jar 包上传的方式,直接用 Eclipse 会找不到缓存

为了提升 Map 端 Join 性能, 我们的策略是, 将小表的数据加载到每个运行的 MapTask 的内存中

如果小表被加载到了内存中, 我们每次在 Map 端只需要读取大表,当读取到大表的每一行数据,可以直接和内存中的小表进行关联。

这个时候,只需要 Map 就可以完成 Join 操作了


1. 如何将小表加入到内存中?

1
2
// 将指定路径文件加载到缓存中
job.addCacheFile(new URI("/xxx"));

2. Map 端怎样读取缓存中的数据

想要在 Java 中使用缓存中的数据,缓存中的数据必须封装到 Java 的容器中

1
2
// 获取缓存文件
context.getLocalCacheFiles()[0]

3. 代码

参考练习-第3题

代码注意点:

setup:从缓存读取一文件(多对一的一)到 HashMap

main 方法中注意点

1
2
3
4
5
// 指定文件加入缓存
job.addCacheFile(new URI("/xxx"));

// 如果没有ReduceTask, 要设置为0
job.setNumReduceTasks(0);

示范代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {

public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
// 用一个hashmap来加载保存产品信息表
Map<String, String> pdInfoMap = new HashMap<String, String>();

Text k = new Text();

/**
* 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt")));
String line;
while (StringUtils.isNotEmpty(line = br.readLine())) {
String[] fields = line.split(",");
pdInfoMap.put(fields[0], fields[1]);
}
br.close();
}

// 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String orderLine = value.toString();
String[] fields = orderLine.split("\t");
String pdName = pdInfoMap.get(fields[1]);
k.set(orderLine + "\t" + pdName);
context.write(k, NullWritable.get());
}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(MapSideJoin.class);

job.setMapperClass(MapSideJoinMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path("D:/srcdata/mapjoininput"));
FileOutputFormat.setOutputPath(job, new Path("D:/temp/output"));

// 指定需要缓存一个文件到所有的maptask运行节点工作目录
/* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
/* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
/* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
/* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录

// 将产品表文件缓存到task工作节点的工作目录中去
job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt"));

//map端join的逻辑不需要reduce阶段,设置reducetask数量为0
job.setNumReduceTasks(0);

boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}

6. 对比

MapJoin 的方式: 大 & 小表

因为有一个表需要加载到内存中,注定加载到内存中的表不能过大(hive 中默认是256M)

大表 & 大表 如何设计

  • ReduceJoin : 解决数据倾斜的问题,合理设计分区。 —很难做到
  • 将其中一个大表进行切分,切分成小表, 最终执行 大表 & 小表

优点

  • 并行度高,不存在数据倾斜的问题,运行效率高

  • 优先选择MapJoin


:arrow_forward: 7. 排序算法 (待整理:TODO)

1. 快速排序

边界值始终是不变的。

image-20180608163614370

image-20180608163841417

image-20180608164004311

image-20180608164127167

image-20180608164318335

2. 归并排序

一般情况针对有序的多个数据集

应用场景:想到了多个Reduce 任务产生的多个文件的合并

1. 归并排序前传: 合并多个数组

image-20180608165031396

2. 归并排序 之 一个大数据集

—归———-

切分成单个的数据集

image-20180608165651868

image-20180608165727484

—-并———
  1. 两两相并, 并成新的数组, 小的先放入数组, 再放大的
  2. 新的数组再不断执行 上述的 合并多个数组

8. ※ Shuffle 过程 ※

image-20180611104416430

  • mapper 阶段处理的数据如何传递给 reducer 阶段,是 MapReduce 框架中 最关键的一个流程,这个流程就叫 Shuffle
  • Shuffle

    • 即 数据混洗 —— 核心机制:数据分区,排序,局部聚合,缓存,拉取,再合并,排序;
    • 环形缓冲区
      • 内存中的一种首尾相连的数据结构(底层是字节数组),kvbuffer 包含原始数据区和元数据区,一个mapTask任务对应一个环形缓冲区
      • 默认大小 100M,默认阈(yu)值 0.8,即当达到 80M 后,会触发 spill溢写 操作,将数据写入磁盘,此时mapper输出会继续向剩余20M中写数据
        缓冲区大小 mapred-site.xml:mapreduce.task.io.sort.mb
        阈值mapred-site.xml:mapreduce.map.sort.spill.percent
        路径:mapred-site.xml:mapreduce.cluster.local.dir
      • 如果此80M数据写入磁盘完成前,剩余20M缓冲区也写完,则会进入阻塞状态,直到是spill完成腾出缓冲区空间
      • 赤道(equtor):环形缓冲区中原始数据和元数据的边界
        • 原始数据:mapTask输出的数据
        • 元数据
          • 记录原始数据的数据,包含4部分内容,占16*4字节;
          • 每一条元数据占用空间是一样的,排序可以通过交换元数据实现
          • 分类
            • a. 原始数据中key的起始位置
            • b. 原始数据中value的起始位置
            • c. value的长度
            • d. 分区信息,即该条信息属于哪个分区
    • 核心操作
        1. 分区 partition(如果 reduceTask 只有一个或者没有,那么 partition 将不起作用)
        1. Sort 根据 key 排序(MapReduce 编程中的 sort 是一定会做的,并且 只能按照 key排序, 当然 如果没有 reducer 阶段,那么就不会对 key 排序)
        1. Combiner 进行局部 value 的合并(Combiner 是可选的组件,作用只是为了提高任务的执行效率)
    • 详细过程

        1. 一个大文件需要处理,它在在 HDFS 上是以 block 块形式存放,每个 block 默认为 128M 存 3 份;运行时每个 map 任务会处理一个切块(split),如果 block 大和 split 相同,有多少个 block 就有多少个 map 任务;所以对整个文件处理时会有很多 map 任务进行并行计算。
        1. 每个 map 任务处理完输入的切块后会把结果写入到内存的一个 环形缓冲区,写入过程中会进行简单排序,当缓冲区的大小阀值,一个后台的线程就会启动把缓冲区中的数据溢写(spill)到本地磁盘中,同时Mapper继续时向环形缓冲区中写入数据。
        • 数据溢写入到磁盘之前,首先会根据 reducer 的数量划分成同数量的分区(partition),每个分区中的都数据会有后台线程根据 map 任务的输出结果 key 进行排序;
        • 如果有 combiner,它会在 缓冲区溢写到磁盘之前 和 mapTask排好序的输出上 运行,使写到本地磁盘和传给 reducer 的数据更少;
          Combiner即是把同一分区中的同一key的数据进行合并,整个shuffle过程会调用两个Combiner !
        • 最后在本地生成分好区且排好序的小文件。
        • 注意:如果 map 向环形缓冲区写入数据的速度大于向本地写入数据的速度,环形缓冲区会被写满,向环形缓冲区写入数据的线程会阻塞直至缓冲区中的内容全部溢写到磁盘后再次启动,到阀值后会向本地磁盘新建一个溢写文件;
        1. map 任务完成之前,会把本地磁盘溢写的所有文件 不停地 合并(merge)成得到一个结果文件,合并得到的结果文件会根据小溢写文件的分区而分区,每个分区的数据会再次根据 key 进行 排序,得到的结果文件是分好区且排好序的(可以合并成一个文件的溢写文件数量默认为10);
          默认合并溢写文件数量 mapred-site.xml:mapreduce.task.io.sort.factor
        1. reduce 任务启动,Reducer 中的一个线程定期向 MRAppMaster 询问 Mapper 输出结果文件位置,Mapper 结束后会向 MRAppMaster 汇报信息,从而 Reducer 会得知 Mapper 状态并得到 map 结果文件目录;
          reduce任务数配置
          a)mapred-site.xml:mapreduce.job.reduces
          b) job.setNumReduceTasks(num)
        1. 当有一个 Mapper 结束时,reduce 任务进入复制阶段,reduce 任务通过 http 协议(hadoop 内置了netty容器)把所有 Mapper 结果文件的 对应的分区数据 拉取(fetch)过来,Reducer 可以并行复 制 Mapper 的 结果 , 默认线程数为5; 所有 Reducer 复制完成 map 结果文件后,由于 Reducer 可能会失败,NodeManager 并不会在第一个 map 结果文件复制完成后就删除它,而是直到作业完成后 MRAppMaster 通知 NodeManager 进行删除; 另外如果 map 结果文件相当小,则会被直接复制到 reduce NodeManager 的内存;一旦缓冲区达到 reduce 的阈值大小 0.66 或 写入到 reduce NodeManager 内 存 中 文 件 个 数 达 到 map 输出阈值 1000,reduce 就会把 map 结果文件合并(merge)溢写到本地;
          默认线程数 mapred-site.xml:mapreduce.reduce.shuffle.parallelcopies
          缓冲区大小mapred-site.xml:mapreduce.reduce.shuffle.input.buffer.percent,默认0.7
          阈值:mapred-site.xml:mapreduce.reduce.shuffle.merge.percent,默认0.66
          map输出阈值1000:mapred-site.xml:mapreduce.reduce.merge.inmem.threshold
        1. 复制阶段完成后,Reducer 进入 Merge 阶段,循环地合并 map 结果文件,维持其顺序排序,合并因子默认为 10,经过不断地 Merge 后得到一个”最终文件”,可能存储在磁盘也可能存在内存中;
        1. “最终文件”输入到 reduce 进行计算,计算结果输入到 HDFS。
      • [ 注意 ]

        • 溢写前会先按照分区进行排序,再按key进行排序,采用 快速排序
          排序是按照原始数据排序,但是由于原始数据不好移动且原始数据包含了原始数据的位置信息,所以移动的其实是元数据;写入时读的是元数据,真正写入的时原始数据
        • 最后的数据如果不够80M,也会被强制flush到磁盘
        • 每个mapTask任务生成的磁盘小数据最后都会merge成一个大文件,采用 归并排序
        • Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁 盘io的次数越少,执行速度就越快。


10、自定义输入 InputFormat

  • 默认的文件加载:TextInputFormat
  • 默认的文件读取:LineRecordReader
  • 源码追踪过程:context –> mappercontext –> mapcontext –> reader –> input –> real –> inputFormat.createRecordReader(split,taskContext),然后查找 inputFormat –> createRecordReader(split,taskContext),inputFormat –> TextInputFormat实例对象
  • 案例:多个小文件合并 word1.txt ~word10.txt 每次读取一个小文件
    • 自定义输入,需要创建两个类,并通过Job对象指定自定义输入
    • \1. 创建XxxInputFormat类,继承FileInputFormat<>,重写 createRecordReader() 方法
    • \2. 创建XxxRecordReader类,继承RecordReader<>,重写以下方法:
      • initialize():初始化方法,类似于setup(),对属性、链接或流进行初始化
      • getCurrentKey():返回key
      • getCurrentValue():返回value
      • getProgress():返回文件执行进度
      • nextKeyValue():返回文件是否读取结束
      • close():进行一些资源的释放
    • \3. 在mapreduce类的main()方法中指定自定义输入:job.setInputFormatClass(XxxInputFormat.class);

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.rox.mapreduce.mr4._02_inputformat;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {

/**
* 设置每个小文件不可分片,保证一个小文件生成一个key-v键值对
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}


@Override
public RecordReader<NullWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.rox.mapreduce.mr4._02_inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


public class WholeFileRecordReader extends RecordReader<NullWritable, Text>{

private FileSplit fileSplit;
private Configuration conf;
private Text value = new Text();
private boolean processed = false; // 标识文件是否读取完成

/**
* 初始化方法
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit=(FileSplit)split;
this.conf = context.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int)fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
// 把输入流上的数据全部读取到contents字节数组中
IOUtils.readFully(in, contents, 0, contents.length);
// 把读取到的数据设置到value里
value.set(contents,0,contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}

@Override
public NullWritable getCurrentKey()
throws IOException, InterruptedException {
return NullWritable.get();
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f : 0.0f;
}

@Override
public void close() throws IOException {
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.rox.mapreduce.mr4._02_inputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SmallFilesConvertToBigMR extends Configured implements Tool {

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SmallFilesConvertToBigMR(), args);
System.exit(exitCode);
}

@Override
public int run(String[] args) throws Exception {

Configuration conf = new Configuration();

conf.set("fs.defaultFS", "hdfs://cs1:9000");

System.setProperty("HADOOP_USER_NAME", "ap");

Job job = Job.getInstance(conf, "combine small files to bigfile");

job.setJarByClass(SmallFilesConvertToBigMR.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setMapperClass(SmallFilesConvertToBigMRMapper.class);

job.setReducerClass(SmallFilesConvertToBigMRReducer.class);

job.setOutputKeyClass(NullWritable.class);

job.setOutputValueClass(Text.class);
////////
job.setInputFormatClass(WholeFileInputFormat.class);

// job.setOutputFormatClass(SequenceFileOutputFormat.class);

Path input = new Path("/in/joindemo");

Path output = new Path("/out/bigfile");

FileInputFormat.setInputPaths(job, input);

FileSystem fs = FileSystem.get(conf);

if (fs.exists(output)) {

fs.delete(output, true);

}

FileOutputFormat.setOutputPath(job, output);

int status = job.waitForCompletion(true) ? 0 : 1;

return status;
}


static class SmallFilesConvertToBigMRMapper
extends Mapper<NullWritable, Text, Text, Text> {

private Text filenameKey;

@Override
protected void setup(
Mapper<NullWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
filenameKey = new Text(path.toString());
}

@Override
protected void map(NullWritable key, Text value,
Mapper<NullWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(filenameKey, value);
}
}

static class SmallFilesConvertToBigMRReducer
extends Reducer<Text, Text, NullWritable, Text> {
@Override

protected void reduce(Text filename, Iterable<Text> bytes,

Context context) throws IOException, InterruptedException {

context.write(NullWritable.get(), bytes.iterator().next());

}
}
}


11、自定义输出 OutputFormat

  • 默认的文件加载:TextOutputFormat
  • 默认的文件读取:LineRecordWriter
  • 源码追踪过程 略
  • 案例:将考试成绩合格的输出到一个文件夹,不及格的输出到另一个文件夹(注意,不同于分区,分区只是量结果输出到同一文件夹下不同文件)
    • 自定义输出,需要创建两个类,并通过Job对象指定自定义输入
    • \1. 创建XxxOutputFormat类,继承FileOutputFormat<>,重写getRecordWriter()方法
    • \2. 创建XxxRecordWriter类,继承RecordWriter<>,重写以下方法:
      • write():真正向外写出的方法,需要将结果输出到几个不同文件夹,就需要创建几个输出流
        • 而输出流通过FileSystem对象获取,FileSystem对象获取需要配置文件
        • 一般可以通过构造方法直接传入FileSystem对象
      • close():释放资源
    • \3. 在mapreduce类的main()方法中指定自定义输入:job.setOutputFormatClass(XxxOutputFormat.class);

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.rox.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MultipleOutputMR {
public static void main(String[] args) throws Exception {
// 指定HDFS相关参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://cs1:9000");
System.setProperty("HADOOP_USER_NAME", "ap");

//  创建/配置 Job
Job job = Job.getInstance(conf);

// 设置Jar包类型:这里千万别写错了
job.setJarByClass(MultipleOutputMR.class);

// 设置Map Reduce执行类
job.setMapperClass(MultipleOutputMRMapper.class);

// 设置Map输出类
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

////////////// 设置reduce执行个数为0
job.setNumReduceTasks(0);

///////////// 设置MapOutputFormatClass
job.setOutputFormatClass(MyOutputFormat.class);

// 设置输入 输出路径
String inP = "/in/newScoreIn";
String outP = "/out/myoutformat/mulWriteSuc";
FileInputFormat.setInputPaths(job, new Path(inP));
FileOutputFormat.setOutputPath(job, new Path(outP));

// 设置如果存在路径就删除
Path mypath = new Path(outP);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.exists(mypath)) {
hdfs.delete(mypath, true);
}
//  执行job
System.exit(job.waitForCompletion(true)?0:-1);
}


static class MultipleOutputMRMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
// 参考次数>7次 算合格
String[] splits = value.toString().split(",");
if (splits.length > 9) {
context.write(new Text("1::"+value.toString()), NullWritable.get());
}else {
context.write(new Text("2::"+value.toString()), NullWritable.get());
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.rox.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {

@Override
public RecordWriter<Text, NullWritable> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {

Configuration configuration = job.getConfiguration();
FileSystem fs = FileSystem.get(configuration);

Path p1 = new Path("/out/myoutformat/out1");
Path p2 = new Path("/out/myoutformat/out2");

FSDataOutputStream out1 = fs.create(p1);
FSDataOutputStream out2 = fs.create(p2);

return new MyRecordWriter(out1,out2);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.rox.mapreduce.outputformat;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class MyRecordWriter extends RecordWriter<Text, NullWritable> {

FSDataOutputStream fsout = null;
FSDataOutputStream fsout1 = null;

public MyRecordWriter(FSDataOutputStream fsout, FSDataOutputStream fsout1) {
super();
this.fsout = fsout;
this.fsout1 = fsout1;

}


@Override
public void write(Text key, NullWritable value)
throws IOException, InterruptedException {
String[] strs = key.toString().split("::");
if (strs[0].equals("1")) {
fsout.write((strs[1]+"\n").getBytes());
}else {
fsout1.write((strs[1]+"\n").getBytes());
}
}

@Override
public void close(TaskAttemptContext context)
throws IOException, InterruptedException {
IOUtils.closeStream(fsout);
IOUtils.closeStream(fsout1);
}

}

12. 倒排索引建立

概念:

倒排索引(Inverted Index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。了解详情可自行百度

需求:有大量的文本(文档、网页),需要建立搜索索引 , 要求统计每一个单词在每个文件中出现的次数, 如下

思路

  1. 先根据单词&文件名为 key, 输出一个文件
  2. 拆分文件行, 以单词为 key, value 再拼接

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//  InverIndexStepOne 

package cn.itcast.bigdata.mr.inverindex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InverIndexStepOne {

static class InverIndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

Text k = new Text();
IntWritable v = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(" ");

FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fileName = inputSplit.getPath().getName();
for (String word : words) {
k.set(word + "--" + fileName);
context.write(k, v);

}

}

}

static class InverIndexStepOneReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int count = 0;
for (IntWritable value : values) {

count += value.get();
}

context.write(key, new IntWritable(count));

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);
job.setJarByClass(InverIndexStepOne.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, new Path("D:/srcdata/inverindexinput"));
FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(InverIndexStepOneMapper.class);
job.setReducerClass(InverIndexStepOneReducer.class);
job.waitForCompletion(true);
}
}


//  IndexStepTwo 

package cn.itcast.bigdata.mr.inverindex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class IndexStepTwo {
public static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] files = line.split("--");
context.write(new Text(files[0]), new Text(files[1]));
}
}
public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (Text text : values) {
sb.append(text.toString().replace("\t", "-->") + "\t");
}
context.write(key, new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception {

if (args.length < 1 || args == null) {
args = new String[]{"D:/temp/out/part-r-00000", "D:/temp/out2"};
}

Configuration config = new Configuration();
Job job = Job.getInstance(config);

job.setMapperClass(IndexStepTwoMapper.class);
job.setReducerClass(IndexStepTwoReducer.class);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 1:0);
}
}

13、Yarn

1、Yarn图示简介

image-20180611161948431

在Hadoop1.x时, 只有两个主要组件:hdfs(文件存储), MapReduce(计算)

image-20180611162800671

所有的计算相关的全部放在MapReduce上

  • JobTracker: 整个计算程序的老大
    • 资源调度:随机调度
    • 监控程序运行的状态,启动运行程序
    • 存在单点故障问题
  • TaskTracker:负责计算程序的执行
    • 强行的将计算资源分成2部分
      • MapSlot
      • ReduceSlot
    • 每一部分资源只能跑对应的任务
  • 缺陷:
    • 单点故障
    • 资源调度随机,会造成资源浪费
    • JobTracker的运行压力过大


Hadoop2.x: 分理出Yarn,专门负责集群的资源管理和调度

image-20180611162010610

Yarn的进程:

ResourceManager:

  • 整个资源调度的老大

    • 接受hadoop客户端的请求
    • 接受NodeManager 的状态报告, NM的资源状态和存活状态
    • 资源调度,整个计算程序的资源调度,调度的运行资源和节点
  • 内部组件

    • ASM——ApplicationsManager
      • 所有应用程序的管理者,负责调度应用程序
    • Scheduler——调度器概念
      • 调度的是什么时候执行哪个计算程序
      • 调度器:
        • FIFO: first in first out
          • 先提交的先执行,后提交的后执行
          • 内部维护一个队列
        • FAIR: 公平调度器
          • 大家平分资源运行
          • 假设刚开始只有一个任务,占资源100%,此时又来了一个任务,这是进行资源平分,每人50%
          • 内部也是维护一个队列
        • CAPACITY: 可以按需进行配置,使用资源
          • 内部可维护多个队列,多个队列之间可以进行资源分配
          • 例如:分配两个队列
            • 队列1:60%
            • 队列2:40%
            • 每个队列中都是执行FIFO的

NodeManager:

  • 负责真正的提供资源,运行计算程序
    • 接受ResourceManager的命令
    • 提供资源运行计算程序

MRAppMaster:

  • 单个计算程序的老大, 类似于项目经理
    • 负责帮助当前计算程序向ResourceManager申请资源
    • 负责启动 MapTask 和 ReduceTask 任务

Container:

  • 抽象资源容器,封装这一定的cpu,io 和网络资源(逻辑概念)
  • 是运行MapTask,ReduceTask等的运行资源单位
  • 1个split —— 1个MapTask (ReduceTask) —— 1个Container —— 显示为YarnChild,底层运行的资源单位就是Container

2、Yarn运行过程

image-20180612093413719

  • MRAppMaster会在所有的MapTask执行到0.8的时候,开启ReduceTask任务

YARN 作业执行流程:

  1. 用户向 YARN 中提交应用程序,其中包括 MRAppMaster 程序,启动 MRAppMaster 的命令, 用户程序等。
  2. ResourceManager 为该程序分配第一个 Container,并与对应的 NodeManager 通讯,要求 它在这个 Container 中启动应用程序 MRAppMaster。
  3. MRAppMaster 首先向 ResourceManager 注册,这样用户可以直接通过 ResourceManager 查看应用程序的运行状态,然后将为各个任务申请资源,并监控它的运行状态,直到运行结 束,重复 4 到 7 的步骤。
  4. MRAppMaster 采用轮询的方式通过 RPC 协议向 ResourceManager 申请和领取资源。
  5. 一旦 MRAppMaster 申请到资源后,便与对应的 NodeManager 通讯,要求它启动任务。
  6. NodeManager 为任务设置好运行环境(包括环境变量、JAR 包、二进制程序等)后,将 任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
  7. 各个任务通过某个 RPC 协议向 MRAppMaster 汇报自己的状态和进度,以让 MRAppMaster 随时掌握各个任务的运行状态,从而可以在任务败的时候重新启动任务。 8、应用程序运行完成后,MRAppMaster 向 ResourceManager 注销并关闭自己。

3、Job的提交过程(待整理)

  • 客户端向rm发送 提交job请求
  • rm向客户端发送 共享资源路径 和 applicationId
  • 客户端将程序运行需要的共享资源放进共享资源路径
    包括:程序jar包,xml配置文件,split切片信息
  • 客户端向rm发送资源放置成功的报告,并真正 提交应用程序
  • rm接收到客户端的请求,会返回一个空闲的资源节点(比如:node01)
  • 到资源节点(node01)上启动container
  • 启动MRAppMaster
  • 创建作业簿 记录 maptask 和 reducetask 的 运行状态和进度 等信息
  • mrappmaster去共享资源路径下 ,获取 切片 和 配置文件 等信息
  • mrappmaster 向 rm 申请maptask 和 reducetask的资源
  • rm 在处理 mrappmaster 请求时,会 优先处理有关maptask的请求
  • rm 向 mrappmaster 返回空闲节点(数据本地优先原则),运行maptask 或 reducetask
    优先返回有数据的节点。
  • 对象节点需要到hdfs 共享路径下下载程序jar包等 共享资源 到本地
  • mrappmaster 到 对应的节点上,启动container 和 maptask
  • maptask需要向 mrappmaster 汇报自身的 运行状态和进度
  • mrappmaster 监控到所有的maptask 运行进度到 80%,启动reducetask(启动前,也会下载共享资源路径下的响应文件,程序jar包,配置文件等)
  • reducetask 时刻和 mrappmaster 通信,汇报自身的 运行状态和进度
  • 整个运行过程中,maptask运行完成 , 都会向mrappmaster 申请注销 自己
  • 当所有的maptask 和 reducetask 运行完成 , mrappmaster 就会向rm 申请注销,进行资源回收

4.MapReduce&yarn的工作机制–job提交流程–吸星大法

image-20180617232031245

疑点: MRAppMaster 的数量问题

  • 经过研究, 应该是每个 job 会产生一个 MRAppMaster

5.MapReduce全原理剖析—六脉神剑

全图

image-20180618095514310

细节图

image-20180618095415768

image-20180618095444648

6. maptask任务分配切片机制

7. 完整流程图

14. 关于 Driver 配置的注意点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 相当于一个yarn集群的客户端
* 需要在此封装我们的mr程序的相关运行参数,指定jar包
* 最后提交给yarn
* @author
*
*/
public class WordcountDriver {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

//是否运行为本地模式,就是看这个参数值是否为local,默认就是local
/*conf.set("mapreduce.framework.name", "local");*/

//本地模式运行mr程序时,输入输出的数据可以在本地,也可以在hdfs上
//到底在哪里,就看以下两行配置你用哪行,默认就是file:///
/*conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
/*conf.set("fs.defaultFS", "file:///");*/



//运行集群模式,就是把程序提交到yarn中去运行
//要想运行为集群模式,以下3个参数要指定为集群上的值
/*conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "mini1");
conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
Job job = Job.getInstance(conf);

job.setJar("c:/wc.jar");
//指定本程序的jar包所在的本地路径
/*job.setJarByClass(WordcountDriver.class);*/

//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);

//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//指定需要使用combiner,以及用哪个类作为combiner的逻辑
/*job.setCombinerClass(WordcountCombiner.class);*/
job.setCombinerClass(WordcountReducer.class);

//如果不设置InputFormat,它默认用的是TextInputformat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);

//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));


// 指定需要缓存一个文件到所有的maptask运行节点工作目录
/* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
/* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
/* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
/* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录

// 将产品表文件缓存到task工作节点的工作目录中去
job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt"));

//map端join的逻辑不需要reduce阶段,设置reducetask数量为0
job.setNumReduceTasks(0);

//将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}

15. GroupingComparator 分组排序

见练习 或者github

image-20180618184145447

注意点: 分区的数量


1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;


public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{

@Override
public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {
//相同id的订单bean,会发往相同的partition
//而且,产生的分区数,是会跟用户设置的reduce task数保持一致
return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean>{

private Text itemid;
private DoubleWritable amount;

public OrderBean() {
}

public OrderBean(Text itemid, DoubleWritable amount) {
set(itemid, amount);

}

public void set(Text itemid, DoubleWritable amount) {

this.itemid = itemid;
this.amount = amount;

}
public Text getItemid() {
return itemid;
}

public DoubleWritable getAmount() {
return amount;
}

@Override
public int compareTo(OrderBean o) {
int cmp = this.itemid.compareTo(o.getItemid());
if (cmp == 0) {
cmp = -this.amount.compareTo(o.getAmount());
}
return cmp;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(itemid.toString());
out.writeDouble(amount.get());

}

@Override
public void readFields(DataInput in) throws IOException {
String readUTF = in.readUTF();
double readDouble = in.readDouble();

this.itemid = new Text(readUTF);
this.amount= new DoubleWritable(readDouble);
}
@Override
public String toString() {
return itemid.toString() + "\t" + amount.get();
}
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package cn.itcastcat.bigdata.secondarysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* 利用reduce端的GroupingComparator来实现将一组bean看成相同的key
* @author duanhaitao@itcast.cn
*
*/
public class ItemidGroupingComparator extends WritableComparator {

//传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
protected ItemidGroupingComparator() {
super(OrderBean.class, true);
}


@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean abean = (OrderBean) a;
OrderBean bbean = (OrderBean) b;
//比较两个bean时,指定只比较bean中的orderid
return abean.getItemid().compareTo(bbean.getItemid(
}
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.sun.xml.bind.v2.schemagen.xmlschema.List;

/**
*
* @author duanhaitao@itcast.cn
*
*/
public class SecondarySort {

static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{

OrderBean bean = new OrderBean();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
String[] fields = StringUtils.split(line, ",");

bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));

context.write(bean, NullWritable.get());

}

}

static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{


//到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}


public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(SecondarySort.class);

job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);


job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path("c:/wordcount/gpinput"));
FileOutputFormat.setOutputPath(job, new Path("c:/wordcount/gpoutput"));

//在此设置自定义的Groupingcomparator类
job.setGroupingComparatorClass(ItemidGroupingComparator.class);
//在此设置自定义的partitioner类
job.setPartitionerClass(ItemIdPartitioner.class);

job.setNumReduceTasks(2);

job.waitForCompletion(true);

}
}

如果帮到你, 可以给我赞助杯咖啡☕️
0%