Hadoop Mapreduce问题

[复制链接]
查看11 | 回复0 | 2021-1-27 05:48:44 | 显示全部楼层 |阅读模式
请阅读以下两段程序
1:
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.Reducer.Context;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
importorg.apache.hadoop.util.GenericOptionsParser;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.FloatWritable;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importjava.io.IOException;
importjava.util.Iterator;
importjava.util.StringTokenizer;

publicclassaveragenumber{
publicstaticclassAMapperextends
Mapper{

//实现map函数
publicvoidmap(LongWritablekey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
//将输入的纯文本文件的数据转化成String
Stringline=value.toString();

//将输入的数据首先按行进行分割
StringTokenizertokenizerArticle=newStringTokenizer(line,"\n");

//分别对每一行进行处理
while(tokenizerArticle.hasMoreElements()){
//每行按空格划分
StringTokenizertokenizerLine=newStringTokenizer(tokenizerArticle.nextToken());

StringstrName=tokenizerLine.nextToken();//学生姓名部分
StringstrScore=tokenizerLine.nextToken();//成绩部分

Textname=newText(strName);
intscoreInt=Integer.parseInt(strScore);
//输出姓名和成绩
context.write(name,newIntWritable(scoreInt));
}
}

}

publicstaticclassAReducerextends
Reducer{
//实现reduce函数
publicvoidreduce(Textkey,Iterable[I]values,
Contextcontext)throwsIOException,InterruptedException{

intcount=0;
intsum=0;
Iterator[I]it=values.iterator();
while(it.hasNext())
{
sum+=it.next().get();
count++;
}
intaverage=sum/count;
context.write(key,newIntWritable(average));
}
}

publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{
Configurationconf=newConfiguration();
String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2){
System.err.println("Usage:average");
System.exit(2);
}
Jobjob=newJob(conf,"AverageNumber");
job.setJarByClass(averagenumber.class);
job.setMapperClass(AMapper.class);
job.setCombinerClass(AReducer.class);
job.setReducerClass(AReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,newPath(otherArgs[0]));
FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
2.
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.Reducer.Context;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
importorg.apache.hadoop.util.GenericOptionsParser;

importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.FloatWritable;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importjava.io.IOException;
importjava.util.Iterator;
importjava.util.StringTokenizer;

publicclassaveragenumber{
publicstaticclassAMapperextends
Mapper{

//实现map函数
publicvoidmap(LongWritablekey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
//将输入的纯文本文件的数据转化成String
Stringline=value.toString();

//将输入的数据首先按行进行分割
StringTokenizertokenizerArticle=newStringTokenizer(line,"\n");

//分别对每一行进行处理
while(tokenizerArticle.hasMoreElements()){
//每行按空格划分
StringTokenizertokenizerLine=newStringTokenizer(tokenizerArticle.nextToken());

StringstrName=tokenizerLine.nextToken();//学生姓名部分
StringstrScore=tokenizerLine.nextToken();//成绩部分

Textname=newText(strName);
intscoreInt=Integer.parseInt(strScore);
//输出姓名和成绩
context.write(name,newIntWritable(scoreInt));
}
}

}

publicstaticclassAReducerextends
Reducer{
//实现reduce函数
publicvoidreduce(Textkey,Iterable[I]values,
Contextcontext)throwsIOException,InterruptedException{

intcount=0;
intsum=0;
Iterator[I]it=values.iterator();
while(it.hasNext())
{
sum+=it.next().get();
count++;
}
floataverage=sum/count;
context.write(key,newFloatWritable(average));
}
}

publicstaticvoidmain(String[]args)throwsIOException,InterruptedException,ClassNotFoundException{
Configurationconf=newConfiguration();
String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2){
System.err.println("Usage:average");
System.exit(2);
}
Jobjob=newJob(conf,"AverageNumber");
job.setJarByClass(averagenumber.class);
job.setMapperClass(AMapper.class);
job.setCombinerClass(AReducer.class);
job.setReducerClass(AReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job,newPath(otherArgs[0]));
FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
告诉我为什么第二个程序没有输出的内容??????????????求大神解救
分 -->
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

主题

0

回帖

4882万

积分

论坛元老

Rank: 8Rank: 8

积分
48824836
热门排行