云计算技术-MapReduce编程样例

WordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static class doMapper extends Mapper<Object, Text, Text, IntWritable>{  
//第一个Object表示输入key的类型;第二个Text表示输入value的类型
//第三个Text表示表示输出键的类型;第四个IntWritable表示输出值的类型
public static final IntWritable one = new IntWritable(1);
public static Text word = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException,InterruptedException
//抛出异常
{
StringTokenizer tokenizer = new StringTokenizer(value.toString(),"\t");
//StringTokenizer是Java工具包中的一个类,用于将字符串进行拆分
word.set(tokenizer.nextToken());
//返回当前位置到下一个分隔符之间的字符串
context.write(word, one);
//将word存到容器中,记一个数
}
}

public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
//参数同Map一样,依次表示是输入键类型,输入值类型,输出键类型,输出值类型
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
//for循环遍历,将得到的values值累加
result.set(sum);
context.write(key, result);
}
}

样卷例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Test {
/*
* MapReduceBase类:实现Mapper和Reducer接口的基类
* Mapper接口:
* WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
/*
*LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,
*都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值

/*
* Mapper接口中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对
* 中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
* OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。
* OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
* Reporter 用于报告整个应用的运行进度
*/

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//每一行包括课程号、课程名称、学号、学生姓名、成绩(百分制),数据项由空格隔开。
String string = value.toString();
//按行分割
String[] strings = string.split("\n");
for (int i = 0; i < strings.length; i++) {
//以空格分割
String[] data = strings[i].split(" ");
//数组第5个元素是成绩
double score = Double.parseDouble(data[4]);
//以成绩进行分割
if (score < 60) {
k.set("不及格");
v.set(1);
} else if (score >= 60 && score < 70) {
k.set("一般");
v.set(1);
} else if (score >= 70 && score < 80) {
k.set("中等");
v.set(1);
} else if (score >= 80 && score < 90) {
k.set("良好");
v.set(1);
} else if (score >= 90) {
k.set("优秀");
v.set(1);
}
//生成key value键值对
context.write(k, v);
}
}

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

/*
* reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例):
* (优秀[1,1,1,1...])
* 作为reduce的输入
*/
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
//遍历values
for (IntWritable v : values) {
int value = v.get();
count += value;
}
//汇总每个分数段的人数(求和)
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws Exception {
// 1 获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置jar加载路径
job.setJarByClass(RatingDemo.class);
// 3 设置map和reduce类
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
// 4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
Path input = new Path("/input/report.txt");
FileInputFormat.addInputPath(job, input);
Path output = new Path("/output/result");
//判断output文件夹是否存在,如果存在则删除
if (output.getFileSystem(configuration).exists(output)) {
output.getFileSystem(configuration).delete(output);
}
FileOutputFormat.setOutputPath(job, output);
//等待计算完成
boolean completion = job.waitForCompletion(true);
if (completion) {
System.out.println("计算成功!");
} else {
System.out.println("计算失败!");
}
}
}
}

利用MapReduce的一些特性可以实现排序、去重、求和等等操作