publicclassLoadDataToHBase{
publicstaticclassLoadDataToHBaseMapperextends
Mapper{
publicstaticinty,m,d;
Calendarcal=Calendar.getInstance();
//map的key用一个immutableBytesWritable类型的无意义的key,map的value是直接将原来的一行记录输出,
//map完成后会shuffle和sort,将key-value按照key排序,否则写不进hfile,hfile要求后写的key不能小于先写的key
privateImmutableBytesWritableimmutableBytesWritable=newImmutableBytesWritable();
protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
immutableBytesWritable.set(Bytes.toBytes(key.get()));
context.write(immutableBytesWritable,value);
}
}
//reducer每次得到map输出的value就是输入文件的中的一行
//reducer的key也无意义,每一个value值就是一个hfile格式的输出,包括rowkey、family、qualifier、timestamp、value
//其中rowkey是由“年月日+该行的行号”组成,每行的行号由预处理的时候加在每一行的开头
publicstaticclassLoadDataToHBaseReducerextends
Reducer[I]{
publicstaticinty,m,d;
Calendarcal=Calendar.getInstance();
protectedvoidreduce(ImmutableBytesWritablekey,Iterablevalues,
Contextcontext)
throwsIOException,InterruptedException{
Stringvalue="";
while(values.iterator().hasNext())
{
value=values.iterator().next().toString();
if(value!=null&&!"".equals(value))
{
Listlist=newArrayList();
list=createKeyValue(value.toString());
Iteratorit=list.iterator();
while(it.hasNext()){
KeyValuekv=newKeyValue();
kv=it.next();
if(kv!=null){
context.write(key,kv);
}
}
}
}
}
privateListcreateKeyValue(Stringstr){
Listlist=newArrayList();
String[]values=str.toString().split(",");
String[]qualifiersName=CONSTANT.qualifiersName;
//longtimeStamp=CONSTANT.timeStamp;
for(inti=1;i
报错
16/03/2418:12:18INFOmapreduce.Job:map100%reduce0%
16/03/2418:12:26INFOmapreduce.Job:TaskId:attempt_1458611567937_0017_r_000000_0,Status:FAILED
Error:java.io.IOException:Addedakeynotlexicallylargerthanpreviouskey=\x00\x09201622410\x04info00_selfnumber\x00\x00\x01S\xA8\x1C\xB8h\x04,lastkey=\x00\x0820162249\x04info11_privilege\x00\x00\x01S\xA8\x1C\xB8g\x04
atorg.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:202)
atorg.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:288)
atorg.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:253)
atorg.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:935)
atorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:196)
atorg.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:149)
atorg.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
atorg.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
atorg.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
attest1.LoadDataToHBase$LoadDataToHBaseReducer.reduce(LoadDataToHBase.java:69)
attest1.LoadDataToHBase$LoadDataToHBaseReducer.reduce(LoadDataToHBase.java:1)
atorg.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
atorg.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
atorg.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
atjava.security.AccessController.doPrivileged(NativeMethod)
atjavax.security.auth.Subject.doAs(Subject.java:415)
atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
atorg.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
分 -->
|