1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Test {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); String[] strings = string.split("\n"); for (int i = 0; i < strings.length; i++) { String[] data = strings[i].split(" "); double score = Double.parseDouble(data[4]); if (score < 60) { k.set("不及格"); v.set(1); } else if (score >= 60 && score < 70) { k.set("一般"); v.set(1); } else if (score >= 70 && score < 80) { k.set("中等"); v.set(1); } else if (score >= 80 && score < 90) { k.set("良好"); v.set(1); } else if (score >= 90) { k.set("优秀"); v.set(1); } context.write(k, v); } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable v : values) { int value = v.get(); count += value; } context.write(key, new IntWritable(count)); } } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(RatingDemo.class); job.setMapperClass(MyMap.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path input = new Path("/input/report.txt"); FileInputFormat.addInputPath(job, input); Path output = new Path("/output/result"); if (output.getFileSystem(configuration).exists(output)) { output.getFileSystem(configuration).delete(output); } FileOutputFormat.setOutputPath(job, output); boolean completion = job.waitForCompletion(true); if (completion) { System.out.println("计算成功!"); } else { System.out.println("计算失败!"); } } } }
|