MapReduce笔记-练习

求微博共同粉丝

题目

涉及知识点: 多 Job 串联

1
2
3
4
5
6
7
8
9
10
11
12
13
14
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K

以上是数据:
A:B,C,D,F,E,O
表示:A用户 关注B,C,D,E,F,O

求所有两两用户之间的共同关注对象

答案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package com.rox.mapreduce.mr3._01_多Job串联;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CommonFansDemo {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
// Job 逻辑

// 指定 HDFS 相关的参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://cs1:9000");
System.setProperty("HADOOP_USER_NAME", "ap");
//
// 新建一个 job1
Job job1 = Job.getInstance(conf);

// 设置 Jar 包所在路径
job1.setJarByClass(CommonFansDemo.class);

// 指定 mapper 类和 reducer 类
job1.setMapperClass(MyMapper_Step1.class);
job1.setReducerClass(MyReducer_Step1.class);

// 指定 maptask 的输出类型
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);

// 指定最终的输出类型(reduce存在时,就是指 ReduceTask 的输出类型)
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);

// 指定该 MapReduce 程序数据的输入输出路径
FileInputFormat.setInputPaths(job1, new Path("/in/commonfriend"));
FileOutputFormat.setOutputPath(job1, new Path("/out/job1"));
//
// 新建一个 job2
Job job2 = Job.getInstance(conf);

// 设置 Jar 包所在路径
job2.setJarByClass(CommonFansDemo.class);

// 指定 mapper 类和 reducer 类
job2.setMapperClass(MyMapper_Step2.class);
job2.setReducerClass(MyReducer_Step2.class);

// 指定 maptask 的输出类型
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(Text.class);

// 指定最终的输出类型(reduce存在时,就是指 ReduceTask 的输出类型)
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);

// 指定该 MapReduce 程序数据的输入输出路径
FileInputFormat.setInputPaths(job2, new Path("/out/job1"));
FileOutputFormat.setOutputPath(job2, new Path("/out/job2"));

//

/**
* 将多个 job 当做一个组中的 job 提交, 参数名是组名
* 注意: JobControl 是实现了 Runnable 接口的
*/
JobControl jControl = new JobControl("common_friend");
// 将原生的 job携带配置 转换为可控的 job
ControlledJob aJob = new ControlledJob(job1.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());
// 添加依赖关系
bJob.addDependingJob(aJob);
// 添加 job 到组中
jControl.addJob(aJob);
jControl.addJob(bJob);
// 启动一个线程
Thread jobThread = new Thread(jControl);
jobThread.start();
while (!jControl.allFinished()) {
Thread.sleep(500);
}
jobThread.stop();
}

static class MyMapper_Step1 extends Mapper<LongWritable, Text, Text, Text> {

String[] user_attentions;
String[] attentions;
Text k = new Text();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

user_attentions = value.toString().split(":");
attentions = user_attentions[1].trim().split(",");

for (String att : attentions) {
k.set(att);
v.set(user_attentions[0].trim());
context.write(k, v);
}
}
}

/**
* @author shixuanji
* 将两两粉丝(普通用户)拼接起来, 格式a-f:c => a,b 都共同关注了 c
*
* A F,I,O,K,G,D,C,H,B
B E,J,F,A
C B,E,K,A,H,G,F
D H,C,G,F,E,A,K,L
E A,B,L,G,M,F,D,H
F C,M,L,A,D,G
*/
static class MyMapper_Step2 extends Mapper<LongWritable, Text, Text, Text> {

String[] attenion_users;
String[] users;
Text k = new Text();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
attenion_users = value.toString().split("\t");
users = attenion_users[1].trim().split(",");
for (String u1 : users) {
for (String u2 : users) {
if (u1.compareTo(u2) < 0) {
String users = u1 + "-" + u2;
k.set(users);
v.set(attenion_users[0].trim());
context.write(k, v);
}
}
}
}
}

/**
* @author shixuanji
* 需要统计的是, 某人拥有的全部粉丝
* key: 传过来的 key
* value: 用,分割
*/
static class MyReducer_Step1 extends Reducer<Text, Text, Text, Text> {

Text k = new Text();
Text v = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

// 注意: 这里 sb 不能写在外面,会不断的拼接
StringBuilder sb = new StringBuilder();
for (Text v : values) {
sb.append(v.toString()).append(",");
}
k.set(key);
v.set(sb.substring(0, sb.length() - 1));
context.write(k, v);
}
}

/**
* @author shixuanji
* 拿到的数据: a-b c
*/
static class MyReducer_Step2 extends Reducer<Text, Text, Text, Text> {

Text k = new Text();
Text v = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text attention : values) {
sb.append(attention.toString()).append(",");
}
k.set(key);
v.set(sb.substring(0, sb.length() - 1));
context.write(k, v);
}
}
}



// job1的输出
A F,I,O,K,G,D,C,H,B
B E,J,F,A
C B,E,K,A,H,G,F
D H,C,G,F,E,A,K,L
E A,B,L,G,M,F,D,H
F C,M,L,A,D,G
G M
H O
I O,C
J O
K O,B
L D,E
M E,F
O A,H,I,J,F


// job2的输出
A-B E,C
A-C D,F
A-D F,E
A-E C,D,B
A-F O,B,E,D,C
A-G E,F,D,C
A-H O,E,D,C
A-I O
A-J B,O
A-K D,C
A-L D,F,E
A-M E,F
B-C A
B-D E,A
B-E C
B-F A,E,C
B-G C,A,E
B-H A,E,C
B-I A
B-K C,A
B-L E
B-M E
B-O A,K
C-D A,F
C-E D
C-F D,A
C-G F,A,D
C-H D,A
C-I A
C-K A,D
C-L F,D
C-M F
C-O I,A
D-E L
D-F E,A
D-G A,F,E
D-H E,A
D-I A
D-K A
D-L F,E
D-M F,E
D-O A
E-F C,B,M,D
E-G C,D
E-H C,D
E-J B
E-K D,C
E-L D
F-G A,D,C,E
F-H A,E,C,D,O
F-I O,A
F-J O,B
F-K C,A,D
F-L E,D
F-M E
F-O A
G-H A,C,D,E
G-I A
G-K C,A,D
G-L D,E,F
G-M F,E
G-O A
H-I O,A
H-J O
H-K A,D,C
H-L E,D
H-M E
H-O A
I-J O
I-K A
I-O A
K-L D
K-O A
L-M F,E

求学生成绩

题目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
computer,huangxiaoming,85,86,41,75,93,42,85
computer,xuzheng,54,52,86,91,42
computer,huangbo,85,42,96,38
english,zhaobenshan,54,52,86,91,42,85,75
english,liuyifei,85,41,75,21,85,96,14
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
english,zhouqi,85,86,41,75,93,42,85,75,55,47,22
english,huangbo,85,42,96,38,55,47,22
algorithm,liutao,85,75,85,99,66
computer,huangzitao,85,86,41,75,93,42,85
math,wangbaoqiang,85,86,41,75,93,42,85
computer,liujialing,85,41,75,21,85,96,14,74,86
computer,liuyifei,75,85,62,48,54,96,15
computer,liutao,85,75,85,99,66,88,75,91
computer,huanglei,76,95,86,74,68,74,48
english,liujialing,75,85,62,48,54,96,15
math,huanglei,76,95,86,74,68,74,48
math,huangjiaju,85,75,86,85,85,74,86
math,liutao,48,58,67,86,15,33,85
english,huanglei,85,75,85,99,66,88,75,91
math,xuzheng,54,52,86,91,42,85,75
math,huangxiaoming,85,75,85,99,66,88,75,91
math,liujialing,85,86,41,75,93,42,85,75
english,huangxiaoming,85,86,41,75,93,42,85
algorithm,huangdatou,48,58,67,86,15,33,85
algorithm,huangzitao,85,86,41,75,93,42,85,75

一、数据解释

数据字段个数不固定:
第一个是课程名称,总共四个课程,computer,math,english,algorithm,
第二个是学生姓名,后面是每次考试的分数

二、统计需求:
1、统计每门课程的参加考试人数和课程平均分

2、统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件

3、求出每门课程参考学生成绩最高平均分的学生的信息:课程,姓名和平均分

答案

第1小题

统计每门课程的参考人数和课程平均分

涉及知识点: 去重, 自定义类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
//  ScoreBean 
package com.rox.mapreduce.mr3._02_分组组件;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor

public class ScoreBean implements WritableComparable<ScoreBean> {
private String courseName;
private String stuName;
private Double score;

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(courseName);
out.writeUTF(stuName);
out.writeDouble(score);
}
@Override
public void readFields(DataInput in) throws IOException {
this.courseName = in.readUTF();
this.stuName = in.readUTF();
this.score = in.readDouble();
}
@Override
/**
* 如果是相同课程, 按照分数降序排列的
* 如果是不同课程, 按照课程名称升序排列
*/
public int compareTo(ScoreBean o) {
// 测试一下只写按分数降序排序
// return o.getScore().compareTo(this.getScore());

/*// 首先分组(只在相同的组内进行比较)
int nameRes = this.getCourseName().compareTo(o.getCourseName());
if (nameRes == 0) {
// 课程相同的时候才进行降序排序
int scoreRes =
return scoreRes;
}
return nameRes;*/
return 0;
}

public String toString1() {
return stuName + "\t" + score;
}

@Override
public String toString() {
return courseName + "\t" + stuName
+ "\t" + score;
}

public ScoreBean(String stuName, Double score) {
super();
this.stuName = stuName;
this.score = score;
}
}

//  ScorePlusDemo1 
package com.rox.mapreduce.mr3._02_分组组件;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ScorePlusDemo1 {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://cs1:9000");
System.setProperty("HADOOP_USER_NAME", "ap");

Job job = Job.getInstance(conf);
job.setJarByClass(ScorePlusDemo1.class);

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(ScoreBean.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

String inP = "/in/newScoreIn";
String outP = "/out/ans1";
FileInputFormat.setInputPaths(job, new Path(inP));
FileOutputFormat.setOutputPath(job, new Path(outP));

Path mypath = new Path(outP);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}

Boolean waitForComp = job.waitForCompletion(true);
System.exit(waitForComp?0:1);
}


static class MyMapper extends Mapper<LongWritable, Text, Text, ScoreBean> {
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 1.截取
String[] datas = value.toString().trim().split(",");
String courseName = datas[0].trim();
String stuName = datas[1].trim();
int sum = 0;
for (int i=2; i<datas.length; i++) {
sum += Integer.parseInt(datas[i]);
}
double avgScore = sum/(datas.length-2);
ScoreBean sb = new ScoreBean(courseName, stuName, avgScore);
k.set(courseName);
context.write(k, sb);
}
}


static class MyReducer extends Reducer<Text, ScoreBean, Text, Text> {

Text v = new Text();

@Override
protected void reduce(Text key, Iterable<ScoreBean> values,
Reducer<Text, ScoreBean, Text, Text>.Context context)
throws IOException, InterruptedException {

Set<String> stuNames = new HashSet<>();
int count = 0;
int sum = 0;
for (ScoreBean sb : values) {
stuNames.add(sb.getStuName());
count ++;
sum += sb.getScore();
}
int size = stuNames.size();
String val = size + "\t" + (double)sum/count;
v.set(val);
context.write(key, v);
}
}
}

// 执行结果 
algorithm 6 71.33333333333333
computer 10 69.6
english 8 66.0
math 7 72.57142857142857

第2小题

统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件

涉及知识点: 分区, 字符串组合key, Partitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package com.rox.mapreduce.mr3._02_分组组件;

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



/**
* @author shixuanji
* 注意: 此题因为数据中有2条 course 和 stuName相同的数据(english liuyifei), 所以必须再在reduce中继续去重一下, 再计算一下平均分
*
* 否则, 可以不用写reduce, 因为Mapper中已经把逻辑处理完了,可以直接输出

* 最终输出:
* computer liuyifei 43
* computer huanglei 63
* math liutao 64
* ...
*/
public class ScorePlusDemo2 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

// 指定HDFS相关参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://cs1:9000");
System.setProperty("HADOOP_USER_NAME", "ap");

//  创建/配置 Job
Job job = Job.getInstance(conf);

// 设置Jar包类型
job.setJarByClass(ScorePlusDemo2.class);

// 设置Map Reduce执行类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

// 设置Map输出类
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);

// Reduce输出类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);

//  设置分区 
job.setPartitionerClass(MyPartition.class);
job.setNumReduceTasks(4);

// 设置输入 输出路径
String inP = "/in/newScoreIn";
String outP = "/out/scorePlus2";
FileInputFormat.setInputPaths(job, new Path(inP));
FileOutputFormat.setOutputPath(job, new Path(outP));

// 设置如果存在路径就删除
Path mypath = new Path(outP);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}

//  执行job
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:-1);
}

===============================================================

static class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
// 把 课程+学生 作为 key

Text k = new Text(); //只有输出String类型的, 才需要在这里设置Text
DoubleWritable v = new DoubleWritable();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String[] datas = value.toString().trim().split(",");
String kStr = datas[0].trim() + "\t" + datas[1].trim();
int sum = 0;
for (int i = 2; i < datas.length; i++) {
sum += Integer.parseInt(datas[i]);
}
double avg = sum / (datas.length - 2);
k.set(kStr);
v.set(avg);
context.write(k, v);
}
}
===============================================================

/**
* @author shixuanji
* 注意: 此题因为数据中有2条 course 和 stuName相同的数据, 所以必须再在reduce中
* 继续去重一下, 再计算一下平均分
*
* 否则, 可以不用写reduce, 因为Mapper中已经把逻辑处理完了,可以直接输出
*/
static class MyReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

DoubleWritable v = new DoubleWritable();
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {

/**
* 考虑到有 课程, 学生名相同, 后面的数据不同的情况, 这里再做一个平均求和
* 可以验证打印下
*/
int count = 0;
Double sum = 0.0;

for (DoubleWritable avg : values) {
if (count > 0) {
// 有key完全相同的情况才会进到这里
System.out.println("这是第" +count +"次, 说明课程和姓名有相同的两条数据\n课程姓名是: "+key.toString());
}
sum += avg.get();
count ++;
}
Double finAvg = sum/count;
v.set(finAvg);
context.write(key, v);
}
}
}

===============================================================
===============================================================
/**
* @author shixuanji
* 继承 Partitioner, 实现自定义分区
*/
class MyPartition extends Partitioner<Text, DoubleWritable> {

private static HashMap<String, Integer> courseMap = new HashMap<>();
static {
courseMap.put("algorithm", 0);
courseMap.put("computer", 1);
courseMap.put("english", 2);
courseMap.put("math", 3);
}

@Override
public int getPartition(Text key, DoubleWritable value, int numPartitions) {
// 取出Map输出的key中的前半部分--courseName
Integer code = courseMap.get(key.toString().trim().split("\t")[0]);
if (code != null) {
return code;
}
return 5;
}
}
===============================================================
===============================================================

 执行结果 
algorithm huangdatou 56.0
algorithm huangjiaju 82.0
algorithm huanglei 74.0
algorithm huangzitao 72.0
algorithm liutao 82.0
algorithm liuyifei 62.0
----------
computer huangbo 65.0
computer huangdatou 56.0
computer huangjiaju 83.0
computer huanglei 74.0
computer huangxiaoming 72.0
computer huangzitao 72.0
computer liujialing 64.0
computer liutao 83.0
computer liuyifei 62.0
computer xuzheng 65.0
---------
english huangbo 55.0
english huangdatou 56.0
english huanglei 83.0
english huangxiaoming 72.0
english liujialing 62.0
english liuyifei 66.5
english zhaobenshan 69.0
english zhouqi 64.0
------------
math huangjiaju 82.0
math huanglei 74.0
math huangxiaoming 83.0
math liujialing 72.0
math liutao 56.0
math wangbaoqiang 72.0
math xuzheng 69.0

第3小题

求出 每门课程参与考试的学生成绩 最高平局分 的学生的信息:课程,姓名和平均分


解题思路:

  • 通过题意得出2个结论
    • 课程要分组
    • 平均分要排序
  • 排序的话,交给key来做无疑是最好的,因为MapReduce自动key进行分组&排序
  • 因此可以把 课程&平均分 作为一个联合key
  • 为了操作方便,可以封装到一个对象中去: ScoreBean
  • 分组和排序需要在 ScoreBean重写的compareTo()方法中完成
  • 因为最后结果是求每门课程最高平均分,因此需要对课程进行分组。
  • 此时原本的默认分组(以Bean对象整体分组)就不管用了,需要自定义分组
  • 自定义分组要继承WritableComparator,重写compare()方法,指定分组的规则。
  • ScoreBean先按照组别进行排序,到reduce中时,已经是按照组,排好的数据,MyGroup 会把相同的比较结果放到同一个组中,分发到reduce.
  • reduce中,只需要取出每组的第一个元素输出到上下文即可


图示


涉及知识点: mr中key的作用,自定义对象的用法,自定义分组,mr的执行流程

  • 利用“班级和平均分”作为 key,可以将 map 阶段读取到的所有学生成绩数据按照班级 和成绩排倒序,发送到 reduce
  • 在 reduce 端利用 GroupingComparator 将班级相同的 kv 聚合成组,然后取第一个即是最 大值


先贴个结论:

执行流程结论

  • map每读一行就 write 到 context 一次,按照指定的key进行分发
  • map 把所有的数据都读完了之后,大概执行到67%的时候,开始进入 CustomBean,执行CustomBeancompareTo()方法,会按照自己写的规则一条一条数据比较

  • 上述都比较完毕之后,map阶段就结束了,此时来到了 reduce阶段,但是是到了67%

  • 到了reduce阶段,直接进入了MyGroup中自定义的compare方法。
  • MyGroup的compare()方法,如果返回非0, 就会进入 reduce 方法写出到context

MyGroup进入Reduce的条件是

  • MyReduce中,如果compare的结果不等于0,也就是比较的2者不相同, 此时就进入Reduce, 写出到上下文
  • 如果相同,会一直往下读,直到读到不同的, 此时写出读到上下文。
  • 因为MyGroup会在Reduce阶段执行,而CustomBean中的compareTo()是在map阶段执行,所以需要在CustomBean中就把组排好序,此时分组功能才能正常运作

指定分组类MyGroup和不指定的区别

指定与不指定是指:在Driver类中,是否加上job.setGroupingComparatorClass(MyGrouper.class);这一句。

  • 指定分组类
    • 会按照分组类中,自定义的compare()方法比较,相同的为一组,分完一组就进入一次reduce方法
  • 不指定分组类:(目前存疑)
    • 是否是按照key进行分组
    • 如果是自定义类为key,是否是按照此key中值相同的分为一组
    • 如果是hadoop内置类,是否是按照此类的值分组(Text-String的值,IntWritable-int值等..)
    • 依然是走得以上这套分组逻辑,一组的数据读完才进入到Reduce阶段做归并


代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// ScoreBean2 
package com.rox.mapreduce.mr3._02_分组组件;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter

public class ScoreBean2 implements WritableComparable<ScoreBean2> {
private String courseName;
private Double score;

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(courseName);
out.writeDouble(score);
}
@Override
public void readFields(DataInput in) throws IOException {
this.courseName = in.readUTF();
this.score = in.readDouble();
}
@Override
/**
* 如果是相同课程, 按照分数降序排列的
* 如果是不同课程, 按照课程名称升序排列
*/
public int compareTo(ScoreBean2 o) {
// 测试一下只写按分数降序排序
// return o.getScore().compareTo(this.getScore());

// 首先分组(只在相同的组内进行比较)
int nameRes = this.getCourseName().compareTo(o.getCourseName());
if (nameRes == 0) {
// 课程相同的时候才进行降序排序
int scoreRes = o.getScore().compareTo(this.getScore());
return scoreRes;
}
return nameRes;
}

/**
* 实际上ScoreBean中是包含所有的参数的, 这里的输出可以自己设置
*/
@Override
public String toString() {
return courseName + "\t" + score;
}
public ScoreBean2(String courseName, Double score) {
super();
this.courseName = courseName;
this.score = score;
}
public ScoreBean2() {
super();
}
}



// ScorePlusDemo3 

package com.rox.mapreduce.mr3._02_分组组件;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ScorePlusDemo3 {

 main 
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://cs1:9000");
System.setProperty("HADOOP_USER_NAME", "ap");

Job job = Job.getInstance(conf);
job.setJarByClass(ScorePlusDemo3.class);

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

job.setMapOutputKeyClass(ScoreBean2.class);
job.setMapOutputValueClass(Text.class);

job.setGroupingComparatorClass(MyGrouper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

String outP = "/out/scorePlus3";
FileInputFormat.setInputPaths(job, new Path("/in/newScoreIn"));
FileOutputFormat.setOutputPath(job, new Path(outP));

// 如果输出目录存在,就先删除
Path myPath = new Path(outP);
FileSystem fs = myPath.getFileSystem(conf);
if (fs.isDirectory(myPath)) {
fs.delete(myPath, true);
}

boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : -1);
}

 Mapper 
/**
* @author shixuanji
* 输出: key: course
* value: score ...
* 思路:
* 1.不同课程要分开展示, 以 课程+分数 作为key, 在mapper中完成排序
* 2.在reduce中按照 MyGrouper 完成分组
*/
static public class MyMapper
extends Mapper<LongWritable, Text, ScoreBean2, Text> {
private String[] datas;
Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
datas = value.toString().trim().split(",");

int sum = 0;
for (int i = 2; i < datas.length; i++) {
sum += Integer.parseInt(datas[i]);
}
double avg = (double) sum / (datas.length - 2);
ScoreBean2 sb = new ScoreBean2(datas[0].trim(), avg);

v.set(datas[1].trim());
context.write(sb, v);
}
}

 Redecer 
static public class MyReducer
extends Reducer<ScoreBean2, Text, Text, NullWritable> {

Text k = new Text();
int count = 1;

@Override
protected void reduce(ScoreBean2 key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
/**
* 如果没有其它问题
* 此时是按照课程分好组了, 同一个课程的所有学生都过来了, 并且学生成绩是排好的,
* 如果此时求最大值, 只需要取出第一个即可
*/
// 进来一次只取第一个
Text name = values.iterator().next();
k.set(key.getCourseName() + "\t" + name.toString() + "\t"
+ key.getScore());
context.write(k, NullWritable.get());
context.write(new Text("==================第"+count+"次进入reduce"), NullWritable.get());


/*context.write(new Text("==================第"+count+"次进入reduce"), NullWritable.get());
for (Text name : values) {
k.set(key.getCourseName() + "\t" + name.toString() + "\t"
+ key.getScore());
context.write(k, NullWritable.get());
context.write(new Text("---------in for write------"), NullWritable.get());
}*/
count++;
}
}
}

 MyGrouper 

/**
* @author shixuanji
* 自定义分组 需要继承一个类WritableComparator
* 重写compare方法
*/
class MyGrouper extends WritableComparator {

// WritableComparator 此方法的默认无参构造是不会创建对象的, 需要自己重写
public MyGrouper() {
// 中间省去的参数是 Configuration, 如果为空, 会创建一个新的
super(ScoreBean2.class, true);
}

/**
* 此处比较的是2个 WritableComparable 对象, 需要强转一下具体的类对象
*/
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
ScoreBean2 aBean = (ScoreBean2) a;
ScoreBean2 bBean = (ScoreBean2) b;
// 返回分组规则
System.out.println(aBean.getCourseName()+"---MyGroup中比较---"+(bBean.getCourseName()));
return aBean.getCourseName().compareTo(bBean.getCourseName());

}
}

================================================================================
 执行结果 
================================================================================

algorithm huangjiaju 82.28571428571429
==================第1次进入reduce
computer huangjiaju 83.2
==================第2次进入reduce
english huanglei 83.0
==================第3次进入reduce
math huangxiaoming 83.0
==================第4次进入reduce

MR实现两个表的数据关联Join

题目

订单数据表t_order: flag=0
id date pid amount
1001 20150710 P0001 2
1002 20150710 P0001 3
1003 20150710 P0002 3
Id:数据记录id
Date 日期
Pid 商品id
Amount 库存数量

6.商品信息表t_product flag=1
pid name category_id price
P0001 小米5 C01 2000
P0002 锤子T1 C01 3500

mr实现两个表的数据关联
id pid date amount name category_id price


答案1 : Reducer 端 实现 Join

思路

  • map端

    • 读取到当前路径下,所有文件的切片信息, 根据文件名判断是那张表

      • 在setup中,从文件切片中获取到文件名

        1
        2
        3
        4
        5
        6
        // 获取读取到的切片相关信息,一个切片对应一个 maptask
        InputSplit inputSplit = context.getInputSplit();
        // 转换为文件切片
        FileSplit fs = (FileSplit)inputSplit;
        // 获取文件名
        filename = fs.getPath().getName();
      • 这里总共会获得2个文件名(指定目录存了2个指定文件),一个文件名对应一个切片

    • 关联字段作为key, 其它的作为value,在value前面加上当前文件的名称标记

  • reduce端

    • 通过标记区分两张表,把读取到的信息,分别存入2个list中
    • 遍历大的表,与小表进行拼接(小表的相同pid记录只会有一条)
    • 拼接完成后即可写出

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package com.rox.mapreduce.mr3._03_join2表的数据关联;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class ReduceJoinDemo {
public static void main(String[] args) throws Exception {
// 指定HDFS相关参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://cs1:9000");
System.setProperty("HADOOP_USER_NAME", "ap");

//  创建/配置 Job
Job job = Job.getInstance(conf);

// 设置Jar包类型
job.setJarByClass(ReduceJoinDemo.class);

// 设置Map Reduce执行类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

// 设置Map输出类
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// Reduce输出类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// 设置输入 输出路径
String inP = "/in/joindemo";
String outP = "/out/joinout1";
FileInputFormat.setInputPaths(job, new Path(inP));
FileOutputFormat.setOutputPath(job, new Path(outP));

// 设置如果存在路径就删除
Path mypath = new Path(outP);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}

//  执行job
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:-1);
}

/**
* @author shixuanji
* 思路: 读取2个表中的数据,进行标记发送
* key: 两表需要关联的字段
* value: 其它值, 需要标记, 标记数据的来源
*
*
* **核心: 关联条件**
- 想要在 reduce 端完成 join, 要在 reduce 端可以同时接收到两个表中的数据
- 要保证在 Map 端进行读文件的时候, 读到2个表的数据, 并且需要对2个表的数据进行区分
- 将2个表放在同一个目录下

解决:
mapper 开始执行时, 在setup方法中, 从上下文中取到文件名, 根据文件名打标记
*/
static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

String filename = "";
Text k = new Text();
Text v = new Text();

@Override
protected void setup(
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// 获取读取到的切片相关信息,一个切片对应一个 maptask
InputSplit inputSplit = context.getInputSplit();
// 转换为文件切片
FileSplit fs = (FileSplit)inputSplit;
// 获取文件名
filename = fs.getPath().getName();
System.out.println("本次获取到的文件名为-----"+filename);
}


@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// 解析出来每一行内容, 打标记, 发送
String[] infos = value.toString().split("\t");

if (filename.equals("order")) {
k.set(infos[2]);
// 设置标记前缀为 OR
v.set("OR"+infos[0]+"\t"+infos[1]+"\t"+infos[3]);
}else {
k.set(infos[0]);
// 设置标记前缀为 PR
v.set("PR"+infos[1]+"\t"+infos[2]+"\t"+infos[3]);
}
context.write(k, v);
}
}


static class MyReducer extends Reducer<Text, Text, Text, NullWritable> {

Text k = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {

/**
* 首先明确 product 和 order 是 一对多的关系
* 根据前缀不同,取到2个不同的表存进2个容器中
* 遍历多的表, 与一进行拼接
* 最后写出到上下文
* 最终的输出格式 id pid date amount name category_id price
*/
// 因为每次遍历到不同的pid, 都会走进来一次, list也会有新的输出,所以必须定义在里面,每次进来都要初始化
List<String> productList =new ArrayList<>();
List<String> orderList =new ArrayList<>();

for (Text v : values) {
String vStr = v.toString();
if (vStr.startsWith("OR")) {
orderList.add(vStr.substring(2));
}else {
productList.add(vStr.substring(2));
}
}

// 此时2个list添加完了本次 相同的 key(pid) 的所有商品
// 遍历多的进行拼接
for (String or : orderList) {
// 相同的 pid的 product 只有一个, productList中的数量是1
// 但是相同pid 的 订单 可能有多个
String res = key.toString() + "\t" + or + productList.get(0);
k.set(res);
context.write(k, NullWritable.get());
}
}
}
}


※ 答案2 : Mapper 端实现 Join

思路

  • 创建job的时候,把小表加入缓存 在map的setup中,
  • 读取缓存中的数据, 存入一个成员变量 map中
    • map方法中,只需要读一个表, 然后根据关联条件(关联key: pid)消除笛卡尔集,进行拼接
    • map直接输出, 甚至都不需要reduce

注意点:

  • 需要达成jar包运行, 直接用Eclipse会找不到缓存

  • jar包执行方法

    1
    2
    # 如果代码内部指定了输入输出路径,后面的/in,/out参数可以不加
    hadoop jar xxxx.jar com.rox.xxx.xxxx(主方法) /in/xx /out/xx
  • 如果没有Reduce方法

    • main方法中,设置map的写出key,value,应该用 setOutputKeyClass

      1
      2
      3
      //// 设置Map输出类 (因为这里没有Reduce, 所以这里是最终输出,一定要注意!!!)//////////
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);
    • 要设置reduce task 的个数为0

      1
      job.setNumReduceTasks(0);
    • 把小文件加载到缓存中的方法

      1
      2
      ////////////// 将小文件加载到缓存  
      job.addCacheFile(new URI("/in/joindemo/product"));

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package com.rox.mapreduce.mr3._03_join;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class MapJoinDemo {
public static void main(String[] args) throws Exception {
// 指定HDFS相关参数
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://cs1:9000");
System.setProperty("HADOOP_USER_NAME", "ap");

//  创建/配置 Job
Job job = Job.getInstance(conf);

// 设置Jar包类型:这里千万别写错了
job.setJarByClass(MapJoinDemo.class);

// 设置Map Reduce执行类
job.setMapperClass(MyMapper.class);

///////////// 设置Map输出类 (因为这里没有Reduce, 所以这里是最终输出,一定要注意!!!)//////////
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

////////////// 设置reduce执行个数为0
job.setNumReduceTasks(0);

////////////// 将小文件加载到缓存
job.addCacheFile(new URI("/in/joindemo/product"));

// 设置输入 输出路径
String inP = "/in/joindemo/order";
String outP = "/out/joinout2";
FileInputFormat.setInputPaths(job, new Path(inP));
FileOutputFormat.setOutputPath(job, new Path(outP));

// 设置如果存在路径就删除
Path mypath = new Path(outP);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
//  执行job
System.exit(job.waitForCompletion(true)?0:-1);
}

/**
* @author shixuanji
* 思路:
* 创建job的时候,把小表加入缓存
* 在map的setup中, 读取缓存中的数据, 存入一个成员变量 map中
* map方法中,只需要读一个表, 然后根据关联条件(关联key: pid)消除笛卡尔集,进行拼接
* 直接输出, 甚至都不需要reduce
*
* 注意点:
* 需要达成jar包运行, 直接用Eclipse会找不到缓存
* 格式: hadoop jar包本地路径 jar包主方法全限定名 hadoop输入 hadoop输出
*/
static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

// 创建装载小表的map, key存储 关联键, value存其它
Map<String, String> proMap = new HashMap<>();

@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// 获取缓存中存储的小表 (一般是 一对多中的 一), 因为只存了1个,所以直接取第0个
Path path = context.getLocalCacheFiles()[0];
String pString = path.toString();
// 开启in流, BufferedReader 逐行读取文件
BufferedReader br = new BufferedReader(new FileReader(pString));
String line = null;
while ((line = br.readLine()) != null) {
// 成功读取一行
String[] infos = line.split("\t");
// 存进proMap
proMap.put(infos[0],
infos[1] + "\t" + infos[2] + "\t" + infos[3]);
}
// br.close();
}

/**
* 直接从路径读取大文件
*/
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String[] infos = value.toString().split("\t");
String pid = infos[2];
//进行关联 pid到map中匹配 如果包含 证明匹配上了
// 艹, 这里pid之前加了 "", 妈的,当然找不到啦!!!
if (proMap.containsKey(pid)) {
String res = value.toString() + "\t" + proMap.get(pid);
k.set(res);
context.write(k, NullWritable.get());
}
}
}
}

如果帮到你, 可以给我赞助杯咖啡☕️
0%