问题描述:集群中原有采集程序从源文件入hbase出现积压,优化修改程序都无济于事,需要赶紧出个方案进行解决
问题解决:集群中的采集程序也有一条线是从源文件入到hdfs的,所以计划以hdfs里的数据为源数据采用mapreduce生成hfile后通过bulkload的方式入hbase避免了原始数据的清洗操作
以下是开发的程序
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HFileGenerator {
public static class HFileMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String symbol = "_";
if ("".equals(line)||null == line) {
return;
}
String[] items = line.split("\\|", -1);
//根据业务需要组合rowkey
byte[] row = Bytes.toBytes(items[0]+symbol+items[1]+symbol+items[2]+symbol+items[3]+symbol+items[4]);
ImmutableBytesWritable rowkey = new ImmutableBytesWritable(row);
System.out.println(rowkey);
KeyValue kv = new KeyValue(row,
"f1".getBytes(), "column1".getBytes(),
System.currentTimeMillis(), Bytes.toBytes(line));
if (null != kv) {
System.out.println("kv"+kv);
context.write(rowkey, kv);
}
}
}
public static void main(String[] args) throws Exception {
Table table = null;
try{
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
table = connection.getTable(TableName.valueOf("hbase_test"));
Job job = Job.getInstance(conf);
job.setJobName("HFile bulk load test");
job.setJarByClass(HFileGenerator.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setMapperClass(HFileMapper.class);
job.setReducerClass(KeyValueSortReducer.class);
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
// 判断output文件夹是否存在,如果存在则删除
Path path = new Path("hdfs://lip1:8020/user/lipeng/hbase/output");
FileSystem fileSystem = path.getFileSystem(conf);
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
Path path1 = new Path("hdfs://lip1:8020/user/lipeng/hbase/output");
FileInputFormat.addInputPath(job, new Path("hdfs://lip1:8020/user/lipeng/hbase/input"));
FileOutputFormat.setOutputPath(job, path1);
HFileOutputFormat.configureIncrementalLoad(job, (HTable) table);
if (job.waitForCompletion(true)) {
FsShell shell = new FsShell(conf);
try {
//将该目录赋予777权限
shell.run(new String[]{"-chmod", "-R", "777", "hdfs://lip1:8020/user/lipeng/hbase/output"});
} catch (Exception e) {
throw new IOException(e);
}
//加载到hbase表
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(path1, (HTable) table);
} else {
System.exit(1);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if (table != null) {
table.close();
}
}
}
}
执行的时候需要将hbase的classpath添加到hadoop的hadoop-env.sh中,要不然会报找不到hbase相关的类的错