求微博共同粉丝
题目
涉及知识点: 多 Job 串联
1 | A:B,C,D,F,E,O |
以上是数据:
A:B,C,D,F,E,O
表示:A用户 关注B,C,D,E,F,O求所有两两用户之间的共同关注对象
答案:
1 | package com.rox.mapreduce.mr3._01_多Job串联; |
求学生成绩
题目
1 | computer,huangxiaoming,85,86,41,75,93,42,85 |
一、数据解释
数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数二、统计需求:
1、统计每门课程的参加考试人数和课程平均分2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件
3、求出每门课程参考学生成绩最高平均分的学生的信息:课程,姓名和平均分
答案
第1小题
统计每门课程的参考人数和课程平均分
涉及知识点: 去重, 自定义类
1 | // ScoreBean |
第2小题
统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件
涉及知识点: 分区, 字符串组合key, Partitioner
1 | package com.rox.mapreduce.mr3._02_分组组件; |
第3小题
求出 每门课程①参与考试的学生成绩 最高平局分② 的学生的信息:课程,姓名和平均分
解题思路:
- 通过题意得出2个结论
- 课程要分组
- 平均分要排序
- 排序的话,交给key来做无疑是最好的,因为MapReduce会自动对key进行分组&排序
- 因此可以把
课程&平均分
作为一个联合key - 为了操作方便,可以封装到一个对象中去: ScoreBean
- 分组和排序需要在 ScoreBean重写的
compareTo()
方法中完成 - 因为最后结果是求每门课程的最高平均分,因此需要对课程进行分组。
- 此时原本的默认分组(以Bean对象整体分组)就不管用了,需要自定义分组
- 自定义分组要继承
WritableComparator
,重写compare()
方法,指定分组的规则。 - ScoreBean先按照组别进行排序,到reduce中时,已经是按照组,排好的数据,MyGroup 会把相同的比较结果放到同一个组中,分发到reduce.
- reduce中,只需要取出每组的第一个元素输出到上下文即可
图示
涉及知识点: mr中key的作用,自定义对象的用法,自定义分组,mr的执行流程
- 利用“班级和平均分”作为 key,可以将 map 阶段读取到的所有学生成绩数据按照班级 和成绩排倒序,发送到 reduce
- 在 reduce 端利用 GroupingComparator 将班级相同的 kv 聚合成组,然后取第一个即是最 大值
先贴个结论:
执行流程结论
- map每读一行就 write 到 context 一次,按照指定的
key
进行分发 map 把所有的数据都读完了之后,大概执行到
67%
的时候,开始进入CustomBean
,执行CustomBean
的compareTo()
方法,会按照自己写的规则一条一条数据比较上述都比较完毕之后,map阶段就结束了,此时来到了 reduce阶段,但是是到了
67%
了- 到了reduce阶段,直接进入了MyGroup中自定义的compare方法。
- MyGroup的
compare()
方法,如果返回非0, 就会进入 reduce 方法写出到context
MyGroup进入Reduce的条件是
- MyReduce中,如果compare的结果不等于0,也就是比较的2者不相同, 此时就进入Reduce, 写出到上下文
- 如果相同,会一直往下读,直到读到不同的, 此时写出读到上下文。
- 因为MyGroup会在Reduce阶段执行,而
CustomBean
中的compareTo()
是在map阶段执行,所以需要在CustomBean
中就把组排好序,此时分组功能才能正常运作
指定分组类MyGroup和不指定的区别
指定与不指定是指:在Driver类中,是否加上job.setGroupingComparatorClass(MyGrouper.class);
这一句。
- 指定分组类:
- 会按照分组类中,自定义的
compare()
方法比较,相同的为一组,分完一组就进入一次reduce方法
- 会按照分组类中,自定义的
- 不指定分组类:(目前存疑)
- 是否是按照key进行分组
- 如果是自定义类为key,是否是按照此key中值相同的分为一组
- 如果是hadoop内置类,是否是按照此类的值分组(Text-String的值,IntWritable-int值等..)
- 依然是走得以上这套分组逻辑,一组的数据读完才进入到Reduce阶段做归并
代码
1 | // ScoreBean2 |
MR实现两个表的数据关联Join
题目
订单数据表t_order: flag=0
id date pid amount
1001 20150710 P0001 2
1002 20150710 P0001 3
1003 20150710 P0002 3
Id:数据记录id
Date 日期
Pid 商品id
Amount 库存数量6.商品信息表t_product flag=1
pid name category_id price
P0001 小米5 C01 2000
P0002 锤子T1 C01 3500mr实现两个表的数据关联
id pid date amount name category_id price
答案1 : Reducer 端 实现 Join
思路
map端
读取到当前路径下,所有文件的切片信息, 根据文件名判断是那张表
在setup中,从文件切片中获取到文件名
1
2
3
4
5
6// 获取读取到的切片相关信息,一个切片对应一个 maptask
InputSplit inputSplit = context.getInputSplit();
// 转换为文件切片
FileSplit fs = (FileSplit)inputSplit;
// 获取文件名
filename = fs.getPath().getName();这里总共会获得2个文件名(指定目录存了2个指定文件),一个文件名对应一个切片
关联字段作为key, 其它的作为value,在value前面加上当前文件的名称标记
reduce端
- 通过标记区分两张表,把读取到的信息,分别存入2个list中
- 遍历大的表,与小表进行拼接(小表的相同pid记录只会有一条)
- 拼接完成后即可写出
代码
1 | package com.rox.mapreduce.mr3._03_join2表的数据关联; |
※ 答案2 : Mapper 端实现 Join
※
思路
- 创建job的时候,把小表加入缓存 在map的setup中,
- 读取缓存中的数据, 存入一个成员变量 map中
- map方法中,只需要读一个表, 然后根据关联条件(关联key: pid)消除笛卡尔集,进行拼接
- map直接输出, 甚至都不需要reduce
注意点:
需要达成jar包运行, 直接用Eclipse会找不到缓存
jar包执行方法
1
2# 如果代码内部指定了输入输出路径,后面的/in,/out参数可以不加
hadoop jar xxxx.jar com.rox.xxx.xxxx(主方法) /in/xx /out/xx如果没有Reduce方法
main方法中,设置map的写出key,value,应该用
setOutputKeyClass
1
2
3//// 设置Map输出类 (因为这里没有Reduce, 所以这里是最终输出,一定要注意!!!)//////////
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);要设置reduce task 的个数为0
1
job.setNumReduceTasks(0);
把小文件加载到缓存中的方法
1
2////////////// 将小文件加载到缓存
job.addCacheFile(new URI("/in/joindemo/product"));
代码
1 | package com.rox.mapreduce.mr3._03_join; |